From 966e55011f762433b126d57aaccd0dac2db0bb85 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Tue, 5 Dec 2023 11:22:03 -0500 Subject: [PATCH] telemetry: track telemetry transactions through conn executor This change modifies the telemetry transaction sampling process to be simpler. Previously for telemetry transaction sampling the telemetry logging struct managed the tracking of sampled transactions via a map of execution ids. If a transaction was determined to be sampled, we would input an entry in this map and for each statement, we would check the map to see if the statement belongs to a tracked transaction. Instead of using a map, we can mark the transaction as being sampled by telemetry in the conn executor. This removes the need for concurrent data struct access when the transaction is marked as being sampled, since each statement no longer needs to read an entry from the shared map. This also removes the need to track the number of transactions currently being sampled for telemetry, as this was introduced to manage the memory used by the map. We will now determine if the transaction should be logged to telemetry at the start of transaction execution or at the start of a transaction restart. The transaction will be marked for telemetry logging if enough time has elapsed since the last transaction was sampled or if session tracing is on. Transaction statements will be logged according to the following settings: - sql.telemetry.transaction_sampling.frequency controls the frequency at which we sample a transaction. If a transaction is marked to be sampled by telemetry, this means we will log all of its statement execution events to telemetry, up to a maximum of `sql.telemetry.transaction_sampling.statement_events_per_transaction.max` statements. Part of: #108284 Release note (ops change): New cluster settings: - sql.telemetry.transaction_sampling.statement_events_per_transaction.max: controls the maximum number of statement events to emit per sampled transaction for TELEMETRY - sql.telemetry.transaction_sampling.frequency: controls the maximum frequency at which we sample transactions for telemetry --- .../settings/settings-for-tenants.txt | 7 +- docs/generated/settings/settings.html | 7 +- pkg/sql/conn_executor.go | 17 +- pkg/sql/conn_executor_exec.go | 17 +- pkg/sql/exec_log.go | 266 ++++----- pkg/sql/telemetry_logging.go | 271 +++++---- pkg/sql/telemetry_logging_test.go | 517 +----------------- 7 files changed, 351 insertions(+), 751 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index b07f803788f2..1a1890b8f3b6 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -299,9 +299,10 @@ sql.stats.system_tables.enabled boolean true when true, enables use of statistic sql.stats.system_tables_autostats.enabled boolean true when true, enables automatic collection of statistics on system tables application sql.telemetry.query_sampling.enabled boolean false when set to true, executed queries will emit an event on the telemetry logging channel application sql.telemetry.query_sampling.internal.enabled boolean false when set to true, internal queries will be sampled in telemetry logging application -sql.telemetry.query_sampling.max_event_frequency integer 8 the max event frequency at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to 'transaction', all statements associated with a single transaction are counted as 1 unit. application -sql.telemetry.query_sampling.mode enumeration statement the execution level used for telemetry sampling. If set to 'statement', events are sampled at the statement execution level. If set to 'transaction', events are sampled at the txn execution level, i.e. all statements for a txn will be logged and are counted together as one sampled event (events are still emitted one per statement) [statement = 0, transaction = 1] application -sql.telemetry.txn_mode.tracking_limit integer 10000 the maximum number of transactions tracked at one time for which we will send all statements to telemetry application +sql.telemetry.query_sampling.max_event_frequency integer 8 the max event frequency (events per second) at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to 'transaction', this value is ignored. application +sql.telemetry.query_sampling.mode enumeration statement the execution level used for telemetry sampling. If set to 'statement', events are sampled at the statement execution level. If set to 'transaction', events are sampled at the transaction execution level, i.e. all statements for a transaction will be logged and are counted together as one sampled event (events are still emitted one per statement). [statement = 0, transaction = 1] application +sql.telemetry.transaction_sampling.max_event_frequency integer 8 the max event frequency (events per second) at which we sample transactions for telemetry. If sampling mode is set to 'statement', this setting is ignored. In practice, this means that we only sample a transaction if 1/max_event_frequency seconds have elapsed since the last transaction was sampled. application +sql.telemetry.transaction_sampling.statement_events_per_transaction.max integer 50 the maximum number of statement events to log for every sampled transaction. Note that statements that are logged by force do not adhere to this limit. application sql.temp_object_cleaner.cleanup_interval duration 30m0s how often to clean up orphaned temporary objects application sql.temp_object_cleaner.wait_interval duration 30m0s how long after creation a temporary object will be cleaned up application sql.log.all_statements.enabled boolean false set to true to enable logging of all executed statements application diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 37a3281237aa..cae93861462d 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -249,9 +249,10 @@
sql.stats.system_tables_autostats.enabled
booleantruewhen true, enables automatic collection of statistics on system tablesServerless/Dedicated/Self-Hosted
sql.telemetry.query_sampling.enabled
booleanfalsewhen set to true, executed queries will emit an event on the telemetry logging channelServerless/Dedicated/Self-Hosted
sql.telemetry.query_sampling.internal.enabled
booleanfalsewhen set to true, internal queries will be sampled in telemetry loggingServerless/Dedicated/Self-Hosted -
sql.telemetry.query_sampling.max_event_frequency
integer8the max event frequency at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to 'transaction', all statements associated with a single transaction are counted as 1 unit.Serverless/Dedicated/Self-Hosted -
sql.telemetry.query_sampling.mode
enumerationstatementthe execution level used for telemetry sampling. If set to 'statement', events are sampled at the statement execution level. If set to 'transaction', events are sampled at the txn execution level, i.e. all statements for a txn will be logged and are counted together as one sampled event (events are still emitted one per statement) [statement = 0, transaction = 1]Serverless/Dedicated/Self-Hosted -
sql.telemetry.txn_mode.tracking_limit
integer10000the maximum number of transactions tracked at one time for which we will send all statements to telemetryServerless/Dedicated/Self-Hosted +
sql.telemetry.query_sampling.max_event_frequency
integer8the max event frequency (events per second) at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to 'transaction', this value is ignored.Serverless/Dedicated/Self-Hosted +
sql.telemetry.query_sampling.mode
enumerationstatementthe execution level used for telemetry sampling. If set to 'statement', events are sampled at the statement execution level. If set to 'transaction', events are sampled at the transaction execution level, i.e. all statements for a transaction will be logged and are counted together as one sampled event (events are still emitted one per statement). [statement = 0, transaction = 1]Serverless/Dedicated/Self-Hosted +
sql.telemetry.transaction_sampling.max_event_frequency
integer8the max event frequency (events per second) at which we sample transactions for telemetry. If sampling mode is set to 'statement', this setting is ignored. In practice, this means that we only sample a transaction if 1/max_event_frequency seconds have elapsed since the last transaction was sampled.Serverless/Dedicated/Self-Hosted +
sql.telemetry.transaction_sampling.statement_events_per_transaction.max
integer50the maximum number of statement events to log for every sampled transaction. Note that statements that are logged by force do not adhere to this limit.Serverless/Dedicated/Self-Hosted
sql.temp_object_cleaner.cleanup_interval
duration30m0show often to clean up orphaned temporary objectsServerless/Dedicated/Self-Hosted
sql.temp_object_cleaner.wait_interval
duration30m0show long after creation a temporary object will be cleaned upServerless/Dedicated/Self-Hosted
sql.log.all_statements.enabled
booleanfalseset to true to enable logging of all executed statementsServerless/Dedicated/Self-Hosted diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 0c15ff38925b..046db3b53207 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -464,7 +464,6 @@ func NewServer( } telemetryLoggingMetrics := newTelemetryLoggingMetrics(cfg.TelemetryLoggingTestingKnobs, cfg.Settings) - telemetryLoggingMetrics.registerOnTelemetrySamplingModeChange(cfg.Settings) s.TelemetryLoggingMetrics = telemetryLoggingMetrics sqlStatsInternalExecutorMonitor := MakeInternalExecutorMemMonitor(MemoryMetrics{}, s.GetExecutorConfig().Settings) @@ -1566,6 +1565,13 @@ type connExecutor struct { // createdSequences keeps track of sequences created in the current transaction. // The map key is the sequence descpb.ID. createdSequences map[descpb.ID]struct{} + + // shouldLogToTelemetry indicates if the current transaction should be + // logged to telemetry. It is used in telemetry transaction sampling + // mode to emit all statement events for a particular transaction. + // Note that the number of statement events emitted per transaction is + // capped by the cluster setting telemetryStatementsPerTransactionMax. + shouldLogToTelemetry bool } // sessionDataStack contains the user-configurable connection variables. @@ -2014,7 +2020,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay ex.state.mu.Lock() defer ex.state.mu.Unlock() ex.state.mu.stmtCount = 0 + tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() + ex.extraTxnState.shouldLogToTelemetry = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) } + // NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a // a ROLLBACK TO SAVEPOINT that generated the event, and that statement deals with the // savepoints, or it's a rewind which also deals with them. @@ -2809,7 +2819,7 @@ func (ex *connExecutor) execCopyOut( stmtFingerprintID, &stats, ex.statsCollector, - ) + ex.extraTxnState.shouldLogToTelemetry) }() stmtTS := ex.server.cfg.Clock.PhysicalTime() @@ -3069,7 +3079,8 @@ func (ex *connExecutor) execCopyIn( ex.server.TelemetryLoggingMetrics, stmtFingerprintID, &stats, - ex.statsCollector) + ex.statsCollector, + ex.extraTxnState.shouldLogToTelemetry) }() var copyErr error diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 3fa7a03b544a..afc17d6785ff 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -820,7 +820,7 @@ func (ex *connExecutor) execStmtInOpenState( stmtFingerprintID, &topLevelQueryStats{}, ex.statsCollector, - ) + ex.extraTxnState.shouldLogToTelemetry) }() switch s := ast.(type) { @@ -1797,7 +1797,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( ppInfo.dispatchToExecutionEngine.stmtFingerprintID, ppInfo.dispatchToExecutionEngine.queryStats, ex.statsCollector, - ) + ex.extraTxnState.shouldLogToTelemetry) }, }) } else { @@ -1824,7 +1824,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( stmtFingerprintID, &stats, ex.statsCollector, - ) + ex.extraTxnState.shouldLogToTelemetry) } }() @@ -2503,7 +2503,7 @@ func (ex *connExecutor) execStmtInNoTxnState( stmtFingerprintID, &topLevelQueryStats{}, ex.statsCollector, - ) + ex.extraTxnState.shouldLogToTelemetry) }() // We're in the NoTxn state, so no statements were executed earlier. Bump the @@ -3138,11 +3138,9 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err } ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1) } + // If we have a commitTimestamp, we should use it. ex.previousTransactionCommitTimestamp.Forward(ev.commitTimestamp) - if telemetryLoggingEnabled.Get(&ex.server.cfg.Settings.SV) { - ex.server.TelemetryLoggingMetrics.onTxnFinish(ev.txnID.String()) - } } } @@ -3163,6 +3161,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) { if ex.server.cfg.TestingKnobs.BeforeRestart != nil { ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason) } + } } @@ -3175,6 +3174,10 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID, }) + tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled() + ex.extraTxnState.shouldLogToTelemetry = + ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal) + ex.state.mu.RLock() txnStart := ex.state.mu.txnStart ex.state.mu.RUnlock() diff --git a/pkg/sql/exec_log.go b/pkg/sql/exec_log.go index 72977e06de8d..aba84bc3ec30 100644 --- a/pkg/sql/exec_log.go +++ b/pkg/sql/exec_log.go @@ -109,6 +109,18 @@ const ( executorTypeInternal ) +// shouldForceLogStatement returns true if the statement should be force logged to +// TELEMETRY. Currently the criteria is if the statement is not of type DML and is +// not BEGIN or COMMIT. +func shouldForceLogStatement(ast tree.Statement) bool { + switch ast.StatementTag() { + case "BEGIN", "COMMIT": + return false + default: + return ast.StatementType() != tree.TypeDML + } +} + // vLevel returns the vmodule log level at which logs from the given executor // should be written to the logs. func (s executorType) vLevel() log.Level { return log.Level(s) + 2 } @@ -132,12 +144,13 @@ func (p *planner) maybeLogStatement( stmtFingerprintID appstatspb.StmtFingerprintID, queryStats *topLevelQueryStats, statsCollector sqlstats.StatsCollector, + shouldLogToTelemetry bool, ) { p.maybeAuditRoleBasedAuditEvent(ctx, execType) p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, stmtCount, bulkJobId, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, stmtFingerprintID, queryStats, statsCollector, - ) + shouldLogToTelemetry) } func (p *planner) maybeLogStatementInternal( @@ -152,6 +165,7 @@ func (p *planner) maybeLogStatementInternal( stmtFingerprintID appstatspb.StmtFingerprintID, topLevelQueryStats *topLevelQueryStats, statsCollector sqlstats.StatsCollector, + shouldLogToTelemetry bool, ) { // Note: if you find the code below crashing because p.execCfg == nil, // do not add a test "if p.execCfg == nil { do nothing }" ! @@ -282,17 +296,28 @@ func (p *planner) maybeLogStatementInternal( // the last event emission. tracingEnabled := telemetryMetrics.isTracing(p.curPlan.instrumentation.Tracing()) - isStmtMode := telemetrySamplingMode.Get(&p.execCfg.Settings.SV) == telemetryModeStatement - // Always sample if one of the scenarios is true: - // - on 'statement' sampling and the current statement is not of type DML - // - on 'transaction' sampling mode and the current statement is not of type DML and is not a COMMIT + // - statement is not of type DML and is not BEGIN or COMMIT // - tracing is enabled for this statement // - this is a query emitted by our console (application_name starts with `$ internal-console`) and // the cluster setting to log console queries is enabled - forceLog := (p.stmt.AST.StatementType() != tree.TypeDML && - (isStmtMode || p.stmt.AST.StatementTag() != "COMMIT")) || - tracingEnabled || logConsoleQuery + forceLog := shouldForceLogStatement(p.stmt.AST) || tracingEnabled || logConsoleQuery + + emit, skippedQueries := telemetryMetrics.shouldEmitStatementLog(shouldLogToTelemetry, stmtCount, forceLog) + if !emit { + return + } + + var queryLevelStats execstats.QueryLevelStats + if stats, ok := p.instrumentation.GetQueryLevelStats(); ok { + queryLevelStats = *stats + } + + queryLevelStats = telemetryMetrics.getQueryLevelStats(queryLevelStats) + indexRecs := make([]string, 0, len(p.curPlan.instrumentation.indexRecs)) + for _, rec := range p.curPlan.instrumentation.indexRecs { + indexRecs = append(indexRecs, rec.SQL) + } var txnID string // p.txn can be nil for COPY. @@ -300,130 +325,113 @@ func (p *planner) maybeLogStatementInternal( txnID = p.txn.ID().String() } - if telemetryMetrics.shouldEmitStatementLog(telemetryMetrics.timeNow(), txnID, forceLog, stmtCount) { - var queryLevelStats execstats.QueryLevelStats - if stats, ok := p.instrumentation.GetQueryLevelStats(); ok { - queryLevelStats = *stats - } - - queryLevelStats = telemetryMetrics.getQueryLevelStats(queryLevelStats) - indexRecs := make([]string, 0, len(p.curPlan.instrumentation.indexRecs)) - for _, rec := range p.curPlan.instrumentation.indexRecs { - indexRecs = append(indexRecs, rec.SQL) - } - - phaseTimes := statsCollector.PhaseTimes() - - // Collect the statistics. - idleLatRaw := phaseTimes.GetIdleLatency(statsCollector.PreviousPhaseTimes()) - idleLatNanos := idleLatRaw.Nanoseconds() - runLatRaw := phaseTimes.GetRunLatency() - runLatNanos := runLatRaw.Nanoseconds() - parseLatNanos := phaseTimes.GetParsingLatency().Nanoseconds() - planLatNanos := phaseTimes.GetPlanningLatency().Nanoseconds() - // We want to exclude any overhead to reduce possible confusion. - svcLatRaw := phaseTimes.GetServiceLatencyNoOverhead() - svcLatNanos := svcLatRaw.Nanoseconds() - - // processing latency: contributing towards SQL results. - processingLatNanos := parseLatNanos + planLatNanos + runLatNanos - - // overhead latency: txn/retry management, error checking, etc - execOverheadNanos := svcLatNanos - processingLatNanos - - skippedQueries := telemetryMetrics.resetSkippedQueryCount() - - var sqlInstanceIDs []int32 - if len(queryLevelStats.SqlInstanceIds) > 0 { - sqlInstanceIDs = make([]int32, 0, len(queryLevelStats.SqlInstanceIds)) - for sqlId := range queryLevelStats.SqlInstanceIds { - sqlInstanceIDs = append(sqlInstanceIDs, int32(sqlId)) - } - sort.Slice(sqlInstanceIDs, func(i, j int) bool { - return sqlInstanceIDs[i] < sqlInstanceIDs[j] - }) - } - - sampledQuery := eventpb.SampledQuery{ - CommonSQLExecDetails: execDetails, - SkippedQueries: skippedQueries, - CostEstimate: p.curPlan.instrumentation.costEstimate, - Distribution: p.curPlan.instrumentation.distribution.String(), - PlanGist: p.curPlan.instrumentation.planGist.String(), - SessionID: p.extendedEvalCtx.SessionID.String(), - Database: p.CurrentDatabase(), - StatementID: p.stmt.QueryID.String(), - TransactionID: txnID, - StatementFingerprintID: uint64(stmtFingerprintID), - MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows, - TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows, - OutputRowsEstimate: p.curPlan.instrumentation.outputRows, - StatsAvailable: p.curPlan.instrumentation.statsAvailable, - NanosSinceStatsCollected: int64(p.curPlan.instrumentation.nanosSinceStatsCollected), - BytesRead: topLevelQueryStats.bytesRead, - RowsRead: topLevelQueryStats.rowsRead, - RowsWritten: topLevelQueryStats.rowsWritten, - InnerJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.InnerJoin]), - LeftOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftOuterJoin]), - FullOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.FullOuterJoin]), - SemiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftSemiJoin]), - AntiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftAntiJoin]), - IntersectAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.IntersectAllJoin]), - ExceptAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.ExceptAllJoin]), - HashJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.HashJoin]), - CrossJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.CrossJoin]), - IndexJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.IndexJoin]), - LookupJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.LookupJoin]), - MergeJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.MergeJoin]), - InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), - ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), - ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), - ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(), - Regions: queryLevelStats.Regions, - SQLInstanceIDs: sqlInstanceIDs, - NetworkBytesSent: queryLevelStats.NetworkBytesSent, - MaxMemUsage: queryLevelStats.MaxMemUsage, - MaxDiskUsage: queryLevelStats.MaxDiskUsage, - KVBytesRead: queryLevelStats.KVBytesRead, - KVPairsRead: queryLevelStats.KVPairsRead, - KVRowsRead: queryLevelStats.KVRowsRead, - KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(), - KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued, - NetworkMessages: queryLevelStats.NetworkMessages, - CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(), - IndexRecommendations: indexRecs, - Indexes: p.curPlan.instrumentation.indexesUsed, - ScanCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanCount]), - ScanWithStatsCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsCount]), - ScanWithStatsForecastCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsForecastCount]), - TotalScanRowsWithoutForecastsEstimate: p.curPlan.instrumentation.totalScanRowsWithoutForecasts, - NanosSinceStatsForecasted: int64(p.curPlan.instrumentation.nanosSinceStatsForecasted), - IdleLatencyNanos: idleLatNanos, - ServiceLatencyNanos: svcLatNanos, - RunLatencyNanos: runLatNanos, - PlanLatencyNanos: planLatNanos, - ParseLatencyNanos: parseLatNanos, - OverheadLatencyNanos: execOverheadNanos, - MvccBlockBytes: queryLevelStats.MvccBlockBytes, - MvccBlockBytesInCache: queryLevelStats.MvccBlockBytesInCache, - MvccKeyBytes: queryLevelStats.MvccKeyBytes, - MvccPointCount: queryLevelStats.MvccPointCount, - MvccPointsCoveredByRangeTombstones: queryLevelStats.MvccPointsCoveredByRangeTombstones, - MvccRangeKeyContainedPoints: queryLevelStats.MvccRangeKeyContainedPoints, - MvccRangeKeyCount: queryLevelStats.MvccRangeKeyCount, - MvccRangeKeySkippedPoints: queryLevelStats.MvccRangeKeySkippedPoints, - MvccSeekCountInternal: queryLevelStats.MvccSeeksInternal, - MvccSeekCount: queryLevelStats.MvccSeeks, - MvccStepCountInternal: queryLevelStats.MvccStepsInternal, - MvccStepCount: queryLevelStats.MvccSteps, - MvccValueBytes: queryLevelStats.MvccValueBytes, - SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(), + phaseTimes := statsCollector.PhaseTimes() + + // Collect the statistics. + idleLatRaw := phaseTimes.GetIdleLatency(statsCollector.PreviousPhaseTimes()) + idleLatNanos := idleLatRaw.Nanoseconds() + runLatRaw := phaseTimes.GetRunLatency() + runLatNanos := runLatRaw.Nanoseconds() + parseLatNanos := phaseTimes.GetParsingLatency().Nanoseconds() + planLatNanos := phaseTimes.GetPlanningLatency().Nanoseconds() + // We want to exclude any overhead to reduce possible confusion. + svcLatRaw := phaseTimes.GetServiceLatencyNoOverhead() + svcLatNanos := svcLatRaw.Nanoseconds() + + // processing latency: contributing towards SQL results. + processingLatNanos := parseLatNanos + planLatNanos + runLatNanos + + // overhead latency: txn/retry management, error checking, etc + execOverheadNanos := svcLatNanos - processingLatNanos + + var sqlInstanceIDs []int32 + if len(queryLevelStats.SqlInstanceIds) > 0 { + sqlInstanceIDs = make([]int32, 0, len(queryLevelStats.SqlInstanceIds)) + for sqlId := range queryLevelStats.SqlInstanceIds { + sqlInstanceIDs = append(sqlInstanceIDs, int32(sqlId)) } + sort.Slice(sqlInstanceIDs, func(i, j int) bool { + return sqlInstanceIDs[i] < sqlInstanceIDs[j] + }) + } - p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) - } else { - telemetryMetrics.incSkippedQueryCount() + sampledQuery := eventpb.SampledQuery{ + CommonSQLExecDetails: execDetails, + SkippedQueries: skippedQueries, + CostEstimate: p.curPlan.instrumentation.costEstimate, + Distribution: p.curPlan.instrumentation.distribution.String(), + PlanGist: p.curPlan.instrumentation.planGist.String(), + SessionID: p.extendedEvalCtx.SessionID.String(), + Database: p.CurrentDatabase(), + StatementID: p.stmt.QueryID.String(), + TransactionID: txnID, + StatementFingerprintID: uint64(stmtFingerprintID), + MaxFullScanRowsEstimate: p.curPlan.instrumentation.maxFullScanRows, + TotalScanRowsEstimate: p.curPlan.instrumentation.totalScanRows, + OutputRowsEstimate: p.curPlan.instrumentation.outputRows, + StatsAvailable: p.curPlan.instrumentation.statsAvailable, + NanosSinceStatsCollected: int64(p.curPlan.instrumentation.nanosSinceStatsCollected), + BytesRead: topLevelQueryStats.bytesRead, + RowsRead: topLevelQueryStats.rowsRead, + RowsWritten: topLevelQueryStats.rowsWritten, + InnerJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.InnerJoin]), + LeftOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftOuterJoin]), + FullOuterJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.FullOuterJoin]), + SemiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftSemiJoin]), + AntiJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.LeftAntiJoin]), + IntersectAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.IntersectAllJoin]), + ExceptAllJoinCount: int64(p.curPlan.instrumentation.joinTypeCounts[descpb.ExceptAllJoin]), + HashJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.HashJoin]), + CrossJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.CrossJoin]), + IndexJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.IndexJoin]), + LookupJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.LookupJoin]), + MergeJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.MergeJoin]), + InvertedJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.InvertedJoin]), + ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]), + ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]), + ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(), + Regions: queryLevelStats.Regions, + SQLInstanceIDs: sqlInstanceIDs, + NetworkBytesSent: queryLevelStats.NetworkBytesSent, + MaxMemUsage: queryLevelStats.MaxMemUsage, + MaxDiskUsage: queryLevelStats.MaxDiskUsage, + KVBytesRead: queryLevelStats.KVBytesRead, + KVPairsRead: queryLevelStats.KVPairsRead, + KVRowsRead: queryLevelStats.KVRowsRead, + KvTimeNanos: queryLevelStats.KVTime.Nanoseconds(), + KvGrpcCalls: queryLevelStats.KVBatchRequestsIssued, + NetworkMessages: queryLevelStats.NetworkMessages, + CpuTimeNanos: queryLevelStats.CPUTime.Nanoseconds(), + IndexRecommendations: indexRecs, + Indexes: p.curPlan.instrumentation.indexesUsed, + ScanCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanCount]), + ScanWithStatsCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsCount]), + ScanWithStatsForecastCount: int64(p.curPlan.instrumentation.scanCounts[exec.ScanWithStatsForecastCount]), + TotalScanRowsWithoutForecastsEstimate: p.curPlan.instrumentation.totalScanRowsWithoutForecasts, + NanosSinceStatsForecasted: int64(p.curPlan.instrumentation.nanosSinceStatsForecasted), + IdleLatencyNanos: idleLatNanos, + ServiceLatencyNanos: svcLatNanos, + RunLatencyNanos: runLatNanos, + PlanLatencyNanos: planLatNanos, + ParseLatencyNanos: parseLatNanos, + OverheadLatencyNanos: execOverheadNanos, + MvccBlockBytes: queryLevelStats.MvccBlockBytes, + MvccBlockBytesInCache: queryLevelStats.MvccBlockBytesInCache, + MvccKeyBytes: queryLevelStats.MvccKeyBytes, + MvccPointCount: queryLevelStats.MvccPointCount, + MvccPointsCoveredByRangeTombstones: queryLevelStats.MvccPointsCoveredByRangeTombstones, + MvccRangeKeyContainedPoints: queryLevelStats.MvccRangeKeyContainedPoints, + MvccRangeKeyCount: queryLevelStats.MvccRangeKeyCount, + MvccRangeKeySkippedPoints: queryLevelStats.MvccRangeKeySkippedPoints, + MvccSeekCountInternal: queryLevelStats.MvccSeeksInternal, + MvccSeekCount: queryLevelStats.MvccSeeks, + MvccStepCountInternal: queryLevelStats.MvccStepsInternal, + MvccStepCount: queryLevelStats.MvccSteps, + MvccValueBytes: queryLevelStats.MvccValueBytes, + SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(), } + + p.logOperationalEventsOnlyExternally(ctx, &sampledQuery) } } diff --git a/pkg/sql/telemetry_logging.go b/pkg/sql/telemetry_logging.go index e9e192d73c93..b217ed09d8a7 100644 --- a/pkg/sql/telemetry_logging.go +++ b/pkg/sql/telemetry_logging.go @@ -11,7 +11,6 @@ package sql import ( - "context" "sync/atomic" "time" @@ -30,16 +29,37 @@ const defaultMaxEventFrequency = 8 var TelemetryMaxStatementEventFrequency = settings.RegisterIntSetting( settings.ApplicationLevel, "sql.telemetry.query_sampling.max_event_frequency", - "the max event frequency at which we sample executions for telemetry, "+ + "the max event frequency (events per second) at which we sample executions for telemetry, "+ "note that it is recommended that this value shares a log-line limit of 10 "+ - " logs per second on the telemetry pipeline with all other telemetry events. "+ - "If sampling mode is set to 'transaction', all statements associated with a single "+ - "transaction are counted as 1 unit.", + "logs per second on the telemetry pipeline with all other telemetry events. "+ + "If sampling mode is set to 'transaction', this value is ignored.", defaultMaxEventFrequency, settings.NonNegativeInt, settings.WithPublic, ) +var telemetryTransactionSamplingFrequency = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.telemetry.transaction_sampling.max_event_frequency", + "the max event frequency (events per second) at which we sample transactions for "+ + "telemetry. If sampling mode is set to 'statement', this setting is ignored. In "+ + "practice, this means that we only sample a transaction if 1/max_event_frequency seconds "+ + "have elapsed since the last transaction was sampled.", + defaultMaxEventFrequency, + settings.NonNegativeInt, + settings.WithPublic, +) + +var telemetryStatementsPerTransactionMax = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.telemetry.transaction_sampling.statement_events_per_transaction.max", + "the maximum number of statement events to log for every sampled transaction. "+ + "Note that statements that are logged by force do not adhere to this limit.", + 50, + settings.NonNegativeInt, + settings.WithPublic, +) + var telemetryInternalQueriesEnabled = settings.RegisterBoolSetting( settings.ApplicationLevel, "sql.telemetry.query_sampling.internal.enabled", @@ -65,9 +85,9 @@ var telemetrySamplingMode = settings.RegisterEnumSetting( "sql.telemetry.query_sampling.mode", "the execution level used for telemetry sampling. If set to 'statement', events "+ "are sampled at the statement execution level. If set to 'transaction', events are "+ - "sampled at the txn execution level, i.e. all statements for a txn will be logged "+ - "and are counted together as one sampled event (events are still emitted one per "+ - "statement)", + "sampled at the transaction execution level, i.e. all statements for a transaction "+ + "will be logged and are counted together as one sampled event (events are still emitted one "+ + "per statement).", "statement", map[int64]string{ telemetryModeStatement: "statement", @@ -76,32 +96,29 @@ var telemetrySamplingMode = settings.RegisterEnumSetting( settings.WithPublic, ) -var telemetryTrackedTxnsLimit = settings.RegisterIntSetting( - settings.ApplicationLevel, - "sql.telemetry.txn_mode.tracking_limit", - "the maximum number of transactions tracked at one time for which we will send "+ - "all statements to telemetry", - 10000, - settings.NonNegativeInt, - settings.WithPublic, -) - -// telemetryLoggingMetrics keeps track of the last time at which an event -// was logged to the telemetry channel, and the number of skipped queries -// since the last logged event. +// TelemetryLoggingMetrics keeps track of the last time at which an event +// was sampled to the telemetry channel, and the number of skipped events +// since the last sampled event. +// +// There are two modes for telemetry logging, set via the setting telemetrySamplingMode: +// +// 1. Statement mode: Events are sampled at the statement level. In this mode, +// the sampling frequency for SampledQuery events is defined by the setting +// TelemetryMaxStatementEventFrequency. No transaction execution events are +// emitted in this mode. +// +// 2. Transaction mode: Events are sampled at the transaction level. In this mode, +// the sampling frequency for SampledQuery events is defined by the setting +// telemetryTransactionSamplingFrequency. In this mode, all of a transaction's +// statement execution events are logged up to a maximum set by +// telemetryStatementsPerTransactionMax. type telemetryLoggingMetrics struct { st *cluster.Settings mu struct { syncutil.RWMutex - // The timestamp of the last emitted telemetry event. + // The last time at which an event was sampled to the telemetry channel. lastEmittedTime time.Time - - // observedTxnExecutions is used to track txn executions that are currently - // being logged. When the sampling mode is set to txns, we must ensure we - // log all stmts for a txn. Txns are removed upon completing execution when - // all events have been captured. - observedTxnExecutions map[string]interface{} } Knobs *TelemetryLoggingTestingKnobs @@ -114,7 +131,6 @@ func newTelemetryLoggingMetrics( knobs *TelemetryLoggingTestingKnobs, st *cluster.Settings, ) *telemetryLoggingMetrics { t := telemetryLoggingMetrics{Knobs: knobs, st: st} - t.mu.observedTxnExecutions = make(map[string]interface{}) return &t } @@ -142,49 +158,70 @@ func NewTelemetryLoggingTestingKnobs( } } -// registerOnTelemetrySamplingModeChange sets up the callback for when the -// telemetry sampling mode is changed. When switching from txn to stmt, we -// clear the txns we are currently tracking for logging. -func (t *telemetryLoggingMetrics) registerOnTelemetrySamplingModeChange( - settings *cluster.Settings, -) { - telemetrySamplingMode.SetOnChange(&settings.SV, func(ctx context.Context) { - mode := telemetrySamplingMode.Get(&settings.SV) - t.mu.Lock() - defer t.mu.Unlock() - if mode == telemetryModeStatement { - // Clear currently observed txns. - t.mu.observedTxnExecutions = make(map[string]interface{}) - } - }) +// Returns the last emitted time for a telemetry event. +// Used in testing. +func (t *telemetryLoggingMetrics) getLastEmittedTime() time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + return t.mu.lastEmittedTime } -func (t *telemetryLoggingMetrics) onTxnFinish(txnExecutionID string) { +// shouldEmitTransactionLog returns true if the transaction should be tracked for telemetry. +// A transaction is tracked if telemetry logging is enabled , the telemetry mode is set to "transaction" +// and at least one of the following conditions is true: +// - the transaction is not internal OR internal queries are enabled +// - the required amount of time has elapsed since the last transaction began sampling +// - the force parameter is true and the transaction is not internal or sampling for internal statements +// is enabled +// +// If the conditions are met, the last sampled time is updated. +func (t *telemetryLoggingMetrics) shouldEmitTransactionLog(force, isInternal bool) bool { + if !telemetryLoggingEnabled.Get(&t.st.SV) { + return false + } if telemetrySamplingMode.Get(&t.st.SV) != telemetryModeTransaction { - return + return false } - // - // Check if txn exec id exists in the map. - exists := false + if isInternal && !telemetryInternalQueriesEnabled.Get(&t.st.SV) { + return false + } + maxEventFrequency := telemetryTransactionSamplingFrequency.Get(&t.st.SV) + if maxEventFrequency == 0 { + return false + } + + txnSampleTime := t.timeNow() + + if force { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.lastEmittedTime = txnSampleTime + return true + } + + requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency) + var enoughTimeElapsed bool func() { + // Avoid taking the full lock if we don't have to. t.mu.RLock() defer t.mu.RUnlock() - _, exists = t.mu.observedTxnExecutions[txnExecutionID] + enoughTimeElapsed = txnSampleTime.Sub(t.mu.lastEmittedTime) >= requiredTimeElapsed }() - if !exists { - return + if !enoughTimeElapsed { + return false } t.mu.Lock() defer t.mu.Unlock() - delete(t.mu.observedTxnExecutions, txnExecutionID) -} -func (t *telemetryLoggingMetrics) getTrackedTxnsCount() int { - t.mu.RLock() - defer t.mu.RUnlock() - return len(t.mu.observedTxnExecutions) + if txnSampleTime.Sub(t.mu.lastEmittedTime) < requiredTimeElapsed { + return false + } + + t.mu.lastEmittedTime = txnSampleTime + + return true } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. @@ -197,67 +234,93 @@ func (t *telemetryLoggingMetrics) timeNow() time.Time { return timeutil.Now() } -// shouldEmitStatementLog returns true if the stmt should be logged to telemetry. The last emitted time -// tracked by telemetry logging metrics will be updated to the given time if any of the following -// are met: -// - The telemetry mode is set to "transaction" AND the stmt is the first in -// the txn AND the txn is not already being tracked AND the required amount -// of time has elapsed. +// shouldEmitStatementLog returns true if the stmt should be logged to telemetry. +// One of the following must be true for a statement to be logged: +// - The telemetry mode is set to "transaction" and the statement's transaction is being tracked +// AND the transaction has not reached its limit for the number of statements logged. // - The telemetry mode is set to "statement" AND the required amount of time has elapsed -// - The txn is not being tracked and the stmt is being forced to log. +// - The stmt is being forced to log. +// +// In addition, the lastEmittedTime tracked by TelemetryLoggingMetrics is updated if the statement +// is logged and we are in "statement" mode. func (t *telemetryLoggingMetrics) shouldEmitStatementLog( - newTime time.Time, txnExecutionID string, force bool, stmtPosInTxn int, -) (shouldEmit bool) { - maxEventFrequency := TelemetryMaxStatementEventFrequency.Get(&t.st.SV) - requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency) + txnIsTracked bool, stmtNum int, force bool, +) (emit bool, skippedQueryCount uint64) { + // For these early exit cases, we don't want to increment the skipped queries count + // since telemetry logging for statements is off in these cases. + if !telemetryLoggingEnabled.Get(&t.st.SV) { + return false, t.skippedQueryCount.Load() + } isTxnMode := telemetrySamplingMode.Get(&t.st.SV) == telemetryModeTransaction - txnsLimit := int(telemetryTrackedTxnsLimit.Get(&t.st.SV)) + if isTxnMode && telemetryTransactionSamplingFrequency.Get(&t.st.SV) == 0 { + return false, t.skippedQueryCount.Load() + } + maxEventFrequency := TelemetryMaxStatementEventFrequency.Get(&t.st.SV) + if !isTxnMode && maxEventFrequency == 0 { + return false, t.skippedQueryCount.Load() + } - if isTxnMode && txnExecutionID == "" { - // If we are in transaction mode, skip logging statements without txn ids - // since we won't be able to track the stmt's txn through its execution. - // This will skip statements like BEGIN which don't have an associated - // transaction id. - return false + newTime := t.timeNow() + + if force { + // We should hold the lock while resetting the skipped queries. + t.mu.Lock() + defer t.mu.Unlock() + skippedQueryCount = t.skippedQueryCount.Swap(0) + if !isTxnMode { + t.mu.lastEmittedTime = newTime + } + return true, skippedQueryCount + } + + if isTxnMode { + if stmtNum == 0 { + // We skip BEGIN statements for transaction telemetry mode. This is because BEGIN statements + // don't have associated transaction execution ids since the transaction doesn't actually + // officially start execution until its first statement. + return false, t.skippedQueryCount.Add(1) + } + + if txnIsTracked { + // Log if we are not at the limit for the number of statements logged + // for this transaction. + // We don't need to update the last sampled time in this case. + if int64(stmtNum) <= telemetryStatementsPerTransactionMax.Get(&t.st.SV) { + return true, t.skippedQueryCount.Swap(0) + } + return false, t.skippedQueryCount.Add(1) + } + + // If the transaction is not being tracked then we are done. + return false, t.skippedQueryCount.Add(1) } - var enoughTimeElapsed, txnIsTracked, startTrackingTxn bool - // Avoid taking the full lock if we don't have to. + // We are in statement mode. Check if enough time has elapsed since the last sampled time. + requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency) + + var enoughTimeElapsed bool func() { + // Avoid taking the full lock if we don't have to. t.mu.RLock() defer t.mu.RUnlock() - + skippedQueryCount = t.skippedQueryCount.Load() enoughTimeElapsed = newTime.Sub(t.mu.lastEmittedTime) >= requiredTimeElapsed - startTrackingTxn = isTxnMode && txnExecutionID != "" && - stmtPosInTxn == 1 && len(t.mu.observedTxnExecutions) < txnsLimit - _, txnIsTracked = t.mu.observedTxnExecutions[txnExecutionID] }() - if txnIsTracked || (!force && (!enoughTimeElapsed || (isTxnMode && !startTrackingTxn))) { - // We don't want to update the last emitted time if the transaction is already tracked. - // We can also early exit here if we aren't forcing the log and we don't meed the required - // elapsed time or can't start tracking the txn due to not having received the first stmt. - return txnIsTracked + if !enoughTimeElapsed { + return false, t.skippedQueryCount.Add(1) } t.mu.Lock() defer t.mu.Unlock() - // The lastEmittedTime and tracked txns may have changed since releasing the Rlock. - // The tracked transaction count could have changed as well so we recheck these values. - txnLimitReached := len(t.mu.observedTxnExecutions) >= txnsLimit - if !force && - (newTime.Sub(t.mu.lastEmittedTime) < requiredTimeElapsed || (startTrackingTxn && txnLimitReached)) { - return false - } - - // We could be forcing the log so we should only track its txn if it meets the criteria. - if startTrackingTxn && !txnLimitReached { - t.mu.observedTxnExecutions[txnExecutionID] = struct{}{} + if newTime.Sub(t.mu.lastEmittedTime) < requiredTimeElapsed { + return false, t.skippedQueryCount.Add(1) } + skippedQueryCount = t.skippedQueryCount.Swap(0) t.mu.lastEmittedTime = newTime - return true + return true, skippedQueryCount } func (t *telemetryLoggingMetrics) getQueryLevelStats( @@ -276,10 +339,8 @@ func (t *telemetryLoggingMetrics) isTracing(_ *tracing.Span, tracingEnabled bool return tracingEnabled } -func (t *telemetryLoggingMetrics) resetSkippedQueryCount() (res uint64) { - return t.skippedQueryCount.Swap(0) -} - -func (t *telemetryLoggingMetrics) incSkippedQueryCount() { - t.skippedQueryCount.Add(1) +func (t *telemetryLoggingMetrics) resetLastEmittedTime() { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.lastEmittedTime = time.Time{} } diff --git a/pkg/sql/telemetry_logging_test.go b/pkg/sql/telemetry_logging_test.go index e8641e8250ab..069a5a430ffb 100644 --- a/pkg/sql/telemetry_logging_test.go +++ b/pkg/sql/telemetry_logging_test.go @@ -12,11 +12,9 @@ package sql import ( "context" - gosql "database/sql" "encoding/json" "fmt" "math" - "net/url" "regexp" "sort" "strings" @@ -24,11 +22,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -442,6 +438,18 @@ func TestTelemetryLogging(t *testing.T) { log.FlushFiles() + // We should not see any transaction events in statement + // telemetry mode. + txnEntries, err := log.FetchEntriesFromFiles( + 0, + math.MaxInt64, + 10000, + regexp.MustCompile(`"EventType":"sampled_transaction"`), + log.WithMarkedSensitiveData, + ) + require.NoError(t, err) + require.Emptyf(t, txnEntries, "found unexpected transaction telemetry events: %v", txnEntries) + entries, err := log.FetchEntriesFromFiles( 0, math.MaxInt64, @@ -466,6 +474,9 @@ func TestTelemetryLogging(t *testing.T) { for _, tc := range testData { t.Run(tc.name, func(t *testing.T) { + t.Cleanup(func() { + s.SQLServer().(*Server).TelemetryLoggingMetrics.resetLastEmittedTime() + }) logCount := 0 expectedLogCount := len(tc.expectedSkipped) // NB: FetchEntriesFromFiles delivers entries in reverse order. @@ -482,7 +493,7 @@ func TestTelemetryLogging(t *testing.T) { err = json.Unmarshal([]byte(e.Message), &sampledQueryFromLog) require.NoError(t, err) - require.Equal(t, tc.expectedSkipped[logCount], sampledQueryFromLog.SkippedQueries) + require.Equal(t, tc.expectedSkipped[logCount], sampledQueryFromLog.SkippedQueries, "%v", e.Message) logCount++ @@ -1668,499 +1679,3 @@ func TestTelemetryLoggingStmtPosInTxn(t *testing.T) { } } } - -type testQuery struct { - query string - logTime float64 - tracingEnabled bool -} - -type expectedLog struct { - logMsg string - skippedQueryCount int -} - -// TestTelemetryLoggingTxnMode tests that when the telemetry logging is set to "transaction", -// we sample events at the transaction level, which means that we'll emit all statements -// for a transaction as if they were counted as 1 emitted event. -func TestTelemetryLoggingTxnMode(t *testing.T) { - defer leaktest.AfterTest(t)() - sc := log.ScopeWithoutShowLogs(t) - defer sc.Close(t) - - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) - defer cleanup() - - st := logtestutils.StubTime{} - sts := logtestutils.StubTracingStatus{} - - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - EventLog: &EventLogTestingKnobs{ - // The sampling checks below need to have a deterministic - // number of statements run by internal executor. - SyncWrites: true, - }, - TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, - getTracingStatus: sts.TracingStatus, - }, - }, - }) - - defer s.Stopper().Stop(context.Background()) - - queries := []testQuery{ - { - query: "BEGIN; TRUNCATE t; COMMIT", - logTime: 0, // Logged. - }, - { - query: "BEGIN; SELECT 1; SELECT 2; SELECT 3; COMMIT", - logTime: 1, // Logged. - }, - { - query: "SELECT 1, 2;", - logTime: 1, // Skipped. - }, - { - query: "SELECT 1, 2;", - logTime: 1, // Logged. Tracing is enabled. - tracingEnabled: true, - }, - { - query: `SELECT * FROM t LIMIT 1`, - logTime: 1.05, // Skipped. - }, - { - query: `SELECT * FROM t LIMIT 1`, - logTime: 1.08, // Skipped. - }, - { - query: `SELECT * FROM t LIMIT 2`, - logTime: 1.1, // Logged. - }, - { - query: `BEGIN; SELECT * FROM t LIMIT 3; COMMIT`, - logTime: 1.15, // Skipped. - }, - { - query: `BEGIN; SELECT * FROM t LIMIT 4; SELECT * FROM t LIMIT 5; COMMIT`, - logTime: 1.2, // Logged. - }, - } - - expectedLogs := []expectedLog{ - { - logMsg: `TRUNCATE TABLE defaultdb.public.t`, - skippedQueryCount: 1, // Skipped BEGIN. - }, - { - logMsg: `COMMIT TRANSACTION`, - skippedQueryCount: 0, - }, - { - logMsg: `SELECT ‹1›`, - skippedQueryCount: 1, // BEGIN skipped. - }, - { - logMsg: `SELECT ‹2›`, - skippedQueryCount: 0, - }, - { - logMsg: `SELECT ‹3›`, - skippedQueryCount: 0, - }, - { - logMsg: `COMMIT TRANSACTION`, - skippedQueryCount: 0, - }, - { - logMsg: `SELECT ‹1›, ‹2›`, - skippedQueryCount: 1, - }, - { - logMsg: `SELECT * FROM ""."".t LIMIT ‹2›`, - skippedQueryCount: 2, - }, - { - logMsg: `SELECT * FROM ""."".t LIMIT ‹4›`, - skippedQueryCount: 4, - }, - { - logMsg: `SELECT * FROM ""."".t LIMIT ‹5›`, - skippedQueryCount: 0, - }, - { - logMsg: `COMMIT TRANSACTION`, - skippedQueryCount: 0, - }, - } - - db := sqlutils.MakeSQLRunner(sqlDB) - st.SetTime(timeutil.FromUnixMicros(0)) - db.Exec(t, `SET application_name = 'telemetry-logging-test-txn-mode'`) - db.Exec(t, "CREATE TABLE t();") - db.Exec(t, "CREATE TABLE u(x int);") - - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.max_event_frequency = 10;`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction";`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`) - - for _, query := range queries { - stubTime := timeutil.FromUnixMicros(int64(query.logTime * 1e6)) - st.SetTime(stubTime) - sts.SetTracingStatus(query.tracingEnabled) - db.Exec(t, query.query) - } - - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } - - if len(entries) == 0 { - t.Fatal(errors.Newf("no entries found")) - } - - for _, e := range entries { - if strings.Contains(e.Message, `"ExecMode":"`+executorTypeInternal.logLabel()) { - t.Errorf("unexpected telemetry event for internal statement:\n%s", e.Message) - } - } - - expectedLogCount := len(expectedLogs) - require.GreaterOrEqualf(t, len(entries), expectedLogCount, - "expected at least %d log entries, got: %d\nentries:\n%v", expectedLogCount, len(entries), entries) - - // FetchEntriesFromFiles delivers entries in reverse order. - entryIdx := len(entries) - 1 - - // Skip the cluster setting queries. - for strings.Contains(entries[entryIdx].Message, "SET CLUSTER SETTING") { - entryIdx-- - } - - for i := 0; i < expectedLogCount; i++ { - e := entries[entryIdx-i] - - var sq eventpb.SampledQuery - err = json.Unmarshal([]byte(e.Message), &sq) - require.NoError(t, err) - - if !strings.Contains(string(sq.Statement), expectedLogs[i].logMsg) { - t.Fatalf("expected log message to contain:\n%s\nbut received:\n%s\nentries:\n%v\n", - expectedLogs[i].logMsg, sq.Statement, entries) - } - - expectedSkipped := expectedLogs[i].skippedQueryCount - if expectedSkipped == 0 { - require.Zerof(t, sq.SkippedQueries, "expected no skipped queries, found:\n%s", e.Message) - } else { - require.Equalf(t, uint64(expectedSkipped), sq.SkippedQueries, "expected skipped queries, found:\n%s", e.Message) - } - } -} - -// TestTelemetryLoggingClearsTxns tests that when the telemetry logging is set to "statements", -// the list of tracked txns is cleared. -func TestTelemetryLoggingClearsTxns(t *testing.T) { - defer leaktest.AfterTest(t)() - sc := log.ScopeWithoutShowLogs(t) - defer sc.Close(t) - - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) - defer cleanup() - - st := logtestutils.StubTime{} - sts := logtestutils.StubTracingStatus{} - - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - EventLog: &EventLogTestingKnobs{ - // The sampling checks below need to have a deterministic - // number of statements run by internal executor. - SyncWrites: true, - }, - TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, - getTracingStatus: sts.TracingStatus, - }, - }, - }) - - defer s.Stopper().Stop(context.Background()) - - db := sqlutils.MakeSQLRunner(sqlDB) - - pgURL, cleanupGoDB := sqlutils.PGUrl( - t, s.AdvSQLAddr(), "CreateConnections" /* prefix */, url.User(username.RootUser)) - defer cleanupGoDB() - sqlDB2, err := gosql.Open("postgres", pgURL.String()) - require.NoError(t, err) - defer sqlDB2.Close() - db2 := sqlutils.MakeSQLRunner(sqlDB2) - - st.SetTime(timeutil.FromUnixMicros(0)) - db.Exec(t, `SET application_name = 'telemetry-logging-test-txns-cleared'`) - - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.max_event_frequency = 10;`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction";`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`) - - telemetryLogging := s.SQLServer().(*Server).TelemetryLoggingMetrics - - require.Zero(t, telemetryLogging.getTrackedTxnsCount()) - stubTime := timeutil.Now() - st.SetTime(stubTime) - db.Exec(t, "BEGIN; SELECT 1;") - require.Equal(t, 1, telemetryLogging.getTrackedTxnsCount()) - - // Ensure that the tracked txn cache is cleared. - db2.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "statement";`) - - testutils.SucceedsSoon(t, func() error { - if telemetryLogging.getTrackedTxnsCount() != 0 { - return errors.Newf("expected no tracked txns") - } - return nil - }) - - db.Exec(t, "COMMIT;") -} - -func TestTelemetryLoggingRespectsTxnLimit(t *testing.T) { - defer leaktest.AfterTest(t)() - sc := log.ScopeWithoutShowLogs(t) - defer sc.Close(t) - - cleanup := logtestutils.InstallLogFileSink(sc, t, logpb.Channel_TELEMETRY) - defer cleanup() - - st := logtestutils.StubTime{} - sts := logtestutils.StubTracingStatus{} - - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - EventLog: &EventLogTestingKnobs{ - // The sampling checks below need to have a deterministic - // number of statements run by internal executor. - SyncWrites: true, - }, - TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{ - getTimeNow: st.TimeNow, - getTracingStatus: sts.TracingStatus, - }, - }, - }) - - defer s.Stopper().Stop(context.Background()) - - // Start 2 txns. Start the second txn with enough elapsed time that would normally allow - // txn tracking for telemetry, however since we set the limit to 1 tracked txn, the - // statements in the 2nd txn will only be logged if it is non DML or tracing is on. - - queries := []struct { - testQuery - txnNum int - }{ - { - testQuery: testQuery{ - query: "BEGIN;", - logTime: 1, - }, - txnNum: 1, - }, - { - testQuery: testQuery{ - query: "BEGIN;", - logTime: 2, - }, - txnNum: 2, - }, - { - testQuery: testQuery{ - query: "SELECT 1;", // Logged. - logTime: 2, - }, - txnNum: 1, - }, - { - testQuery: testQuery{ - query: "SELECT 2;", // Skipped. - logTime: 2, - }, - txnNum: 2, - }, - { - testQuery: testQuery{ - query: "SELECT 1, 1;", // Logged. - logTime: 2, - }, - txnNum: 1, - }, - { - testQuery: testQuery{ - query: "SELECT 2, 2;", // Skipped. - logTime: 2, - }, - txnNum: 2, - }, - { - testQuery: testQuery{ - query: "SELECT 2, 2, 2;", // Not logged, even though enough time has elapsed. - logTime: 2.1, - }, - txnNum: 2, - }, - { - testQuery: testQuery{ - query: "SELECT 2, 2, 2, 2;", // Logged. Tracing is on. - logTime: 2.1, - tracingEnabled: true, - }, - txnNum: 2, - }, - { - testQuery: testQuery{ - query: "SELECT 1, 1, 1;", // Logged. - logTime: 2.1, - }, - txnNum: 1, - }, - { - testQuery: testQuery{ - query: "COMMIT;", - logTime: 3, - }, - txnNum: 1, - }, - { - testQuery: testQuery{ - query: "SELECT 2, 2, 2;", // Not logged, even though enough time has elapsed it is not the first stmt. - logTime: 3.5, - }, - txnNum: 2, - }, - { - testQuery: testQuery{ - query: "COMMIT;", - logTime: 3, - }, - txnNum: 2, - }, - } - - expectedLogs := []expectedLog{ - { - logMsg: `SELECT ‹1›`, - skippedQueryCount: 2, // Skipped both BEGIN queries. - }, - { - logMsg: `SELECT ‹1›, ‹1›`, - skippedQueryCount: 1, - }, - { - logMsg: `SELECT ‹2›, ‹2›, ‹2›, ‹2›`, - skippedQueryCount: 2, - }, - { - logMsg: `SELECT ‹1›, ‹1›, ‹1›`, - skippedQueryCount: 0, - }, - } - - db := sqlutils.MakeSQLRunner(sqlDB) - - pgURL, cleanupGoDB := sqlutils.PGUrl( - t, s.AdvSQLAddr(), "CreateConnections" /* prefix */, url.User(username.RootUser)) - defer cleanupGoDB() - sqlDB2, err := gosql.Open("postgres", pgURL.String()) - require.NoError(t, err) - defer sqlDB2.Close() - db2 := sqlutils.MakeSQLRunner(sqlDB2) - - st.SetTime(timeutil.FromUnixMicros(0)) - db.Exec(t, `SET application_name = 'telemetry-logging-test-txn-limit'`) - - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.txn_mode.tracking_limit = 1;`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.max_event_frequency = 10;`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.mode = "transaction";`) - db.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`) - - for _, query := range queries { - stubTime := timeutil.FromUnixMicros(int64(query.logTime * 1e6)) - st.SetTime(stubTime) - sts.SetTracingStatus(query.tracingEnabled) - if query.txnNum == 1 { - db.Exec(t, query.query) - } else { - db2.Exec(t, query.query) - } - } - - log.FlushFiles() - - entries, err := log.FetchEntriesFromFiles( - 0, - math.MaxInt64, - 10000, - regexp.MustCompile(`"EventType":"sampled_query"`), - log.WithMarkedSensitiveData, - ) - - if err != nil { - t.Fatal(err) - } - - if len(entries) == 0 { - t.Fatal(errors.Newf("no entries found")) - } - - for _, e := range entries { - if strings.Contains(e.Message, `"ExecMode":"`+executorTypeInternal.logLabel()) { - t.Errorf("unexpected telemetry event for internal statement:\n%s", e.Message) - } - } - - expectedLogCount := len(expectedLogs) - if expectedLogCount > len(entries) { - t.Fatalf("expected at least %d log entries, got: %d", expectedLogCount, len(entries)) - } - - // FetchEntriesFromFiles delivers entries in reverse order. - entryIdx := len(entries) - 1 - - // Skip the cluster setting queries. - for strings.Contains(entries[entryIdx].Message, "SET CLUSTER SETTING") { - entryIdx-- - } - - for i := 0; i < expectedLogCount; i++ { - e := entries[entryIdx-i] - if !strings.Contains(e.Message, expectedLogs[i].logMsg+"\"") { - t.Errorf("expected log message to contain:\n%s\nbut received:\n%s\n", - expectedLogs[i].logMsg, e.Message) - } - - var sq eventpb.SampledQuery - err = json.Unmarshal([]byte(e.Message), &sq) - require.NoError(t, err) - - expectedSkipped := expectedLogs[i].skippedQueryCount - if expectedSkipped == 0 { - require.Zerof(t, sq.SkippedQueries, "expected no skipped queries, found:\n%s", e.Message) - } else { - require.Equalf(t, uint64(expectedSkipped), sq.SkippedQueries, "expected skipped queries, found:\n%s", e.Message) - } - } -}