Skip to content

Commit

Permalink
telemetry: track telemetry transactions through conn executor
Browse files Browse the repository at this point in the history
This change modifies the telemetry transaction sampling process to be simpler.
Previously for telemetry transaction sampling the telemetry logging struct
managed the tracking of sampled transactions via a map of execution
ids. If a transaction was determined to be sampled, we would input an entry
in this map and for each statement, we would check the map to see if the
statement belongs to a tracked transaction. Instead of using a map, we
can mark the transaction as being sampled by telemetry in the conn executor.
This removes the need for concurrent data struct access when the transaction
is marked as being sampled, since each statement no longer needs to read an
entry from the shared map. This also removes the need to track the number
of transactions currently being sampled for telemetry, as this was introduced
to manage the memory used by the map.

We will now determine if the transaction should be logged to telemetry at
the start of transaction execution or at the start of a transaction restart.
The transaction will be marked for telemetry logging if enough time has
elapsed since the last transaction was sampled or if session tracing is on.

Transaction statements will be logged according to the following settings:
- sql.telemetry.transaction_sampling.frequency controls the frequency at
which we sample a transaction. If a transaction is marked to be sampled
by telemetry, this means we will log all of its statement execution events
to telemetry, up to a maximum of
`sql.telemetry.transaction_sampling.statement_events_per_transaction.max`
statements.

Part of: cockroachdb#108284

Release note (ops change): New cluster settings:
 - sql.telemetry.transaction_sampling.statement_events_per_transaction.max:
controls the maximum number of statement events to emit per sampled
transaction for TELEMETRY
- sql.telemetry.transaction_sampling.frequency: controls the maximum
frequency at which we sample transactions for telemetry
  • Loading branch information
xinhaoz committed Jan 9, 2024
1 parent 2c3b07e commit 1dde7c0
Show file tree
Hide file tree
Showing 7 changed files with 665 additions and 566 deletions.
7 changes: 4 additions & 3 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,10 @@ sql.stats.system_tables.enabled boolean true when true, enables use of statistic
sql.stats.system_tables_autostats.enabled boolean true when true, enables automatic collection of statistics on system tables application
sql.telemetry.query_sampling.enabled boolean false when set to true, executed queries will emit an event on the telemetry logging channel application
sql.telemetry.query_sampling.internal.enabled boolean false when set to true, internal queries will be sampled in telemetry logging application
sql.telemetry.query_sampling.max_event_frequency integer 8 the max event frequency at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to 'transaction', all statements associated with a single transaction are counted as 1 unit. application
sql.telemetry.query_sampling.mode enumeration statement the execution level used for telemetry sampling. If set to 'statement', events are sampled at the statement execution level. If set to 'transaction', events are sampled at the txn execution level, i.e. all statements for a txn will be logged and are counted together as one sampled event (events are still emitted one per statement) [statement = 0, transaction = 1] application
sql.telemetry.txn_mode.tracking_limit integer 10000 the maximum number of transactions tracked at one time for which we will send all statements to telemetry application
sql.telemetry.query_sampling.max_event_frequency integer 8 the max event frequency (events per second) at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to 'transaction', this value is ignored. application
sql.telemetry.query_sampling.mode enumeration statement the execution level used for telemetry sampling. If set to 'statement', events are sampled at the statement execution level. If set to 'transaction', events are sampled at the transaction execution level, i.e. all statements for a transaction will be logged and are counted together as one sampled event (events are still emitted one per statement). [statement = 0, transaction = 1] application
sql.telemetry.transaction_sampling.max_event_frequency integer 8 the max event frequency (events per second) at which we sample transactions for telemetry. If sampling mode is set to 'statement', this setting is ignored. In practice, this means that we only sample a transaction if 1/max_event_frequency seconds have elapsed since the last transaction was sampled. application
sql.telemetry.transaction_sampling.statement_events_per_transaction.max integer 50 the maximum number of statement events to log for every sampled transaction. Note that statements that are logged by force do not adhere to this limit. application
sql.temp_object_cleaner.cleanup_interval duration 30m0s how often to clean up orphaned temporary objects application
sql.temp_object_cleaner.wait_interval duration 30m0s how long after creation a temporary object will be cleaned up application
sql.log.all_statements.enabled boolean false set to true to enable logging of all executed statements application
Expand Down
7 changes: 4 additions & 3 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,10 @@
<tr><td><div id="setting-sql-stats-system-tables-autostats-enabled" class="anchored"><code>sql.stats.system_tables_autostats.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when true, enables automatic collection of statistics on system tables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-query-sampling-enabled" class="anchored"><code>sql.telemetry.query_sampling.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>when set to true, executed queries will emit an event on the telemetry logging channel</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-query-sampling-internal-enabled" class="anchored"><code>sql.telemetry.query_sampling.internal.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>when set to true, internal queries will be sampled in telemetry logging</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-query-sampling-max-event-frequency" class="anchored"><code>sql.telemetry.query_sampling.max_event_frequency</code></div></td><td>integer</td><td><code>8</code></td><td>the max event frequency at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to &#39;transaction&#39;, all statements associated with a single transaction are counted as 1 unit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-query-sampling-mode" class="anchored"><code>sql.telemetry.query_sampling.mode</code></div></td><td>enumeration</td><td><code>statement</code></td><td>the execution level used for telemetry sampling. If set to &#39;statement&#39;, events are sampled at the statement execution level. If set to &#39;transaction&#39;, events are sampled at the txn execution level, i.e. all statements for a txn will be logged and are counted together as one sampled event (events are still emitted one per statement) [statement = 0, transaction = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-txn-mode-tracking-limit" class="anchored"><code>sql.telemetry.txn_mode.tracking_limit</code></div></td><td>integer</td><td><code>10000</code></td><td>the maximum number of transactions tracked at one time for which we will send all statements to telemetry</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-query-sampling-max-event-frequency" class="anchored"><code>sql.telemetry.query_sampling.max_event_frequency</code></div></td><td>integer</td><td><code>8</code></td><td>the max event frequency (events per second) at which we sample executions for telemetry, note that it is recommended that this value shares a log-line limit of 10 logs per second on the telemetry pipeline with all other telemetry events. If sampling mode is set to &#39;transaction&#39;, this value is ignored.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-query-sampling-mode" class="anchored"><code>sql.telemetry.query_sampling.mode</code></div></td><td>enumeration</td><td><code>statement</code></td><td>the execution level used for telemetry sampling. If set to &#39;statement&#39;, events are sampled at the statement execution level. If set to &#39;transaction&#39;, events are sampled at the transaction execution level, i.e. all statements for a transaction will be logged and are counted together as one sampled event (events are still emitted one per statement). [statement = 0, transaction = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-transaction-sampling-max-event-frequency" class="anchored"><code>sql.telemetry.transaction_sampling.max_event_frequency</code></div></td><td>integer</td><td><code>8</code></td><td>the max event frequency (events per second) at which we sample transactions for telemetry. If sampling mode is set to &#39;statement&#39;, this setting is ignored. In practice, this means that we only sample a transaction if 1/max_event_frequency seconds have elapsed since the last transaction was sampled.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-telemetry-transaction-sampling-statement-events-per-transaction-max" class="anchored"><code>sql.telemetry.transaction_sampling.statement_events_per_transaction.max</code></div></td><td>integer</td><td><code>50</code></td><td>the maximum number of statement events to log for every sampled transaction. Note that statements that are logged by force do not adhere to this limit.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-temp-object-cleaner-cleanup-interval" class="anchored"><code>sql.temp_object_cleaner.cleanup_interval</code></div></td><td>duration</td><td><code>30m0s</code></td><td>how often to clean up orphaned temporary objects</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-temp-object-cleaner-wait-interval" class="anchored"><code>sql.temp_object_cleaner.wait_interval</code></div></td><td>duration</td><td><code>30m0s</code></td><td>how long after creation a temporary object will be cleaned up</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-sql-trace-log-statement-execute" class="anchored"><code>sql.log.all_statements.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to enable logging of all executed statements</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
17 changes: 14 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ func NewServer(
}

telemetryLoggingMetrics := newTelemetryLoggingMetrics(cfg.TelemetryLoggingTestingKnobs, cfg.Settings)
telemetryLoggingMetrics.registerOnTelemetrySamplingModeChange(cfg.Settings)
s.TelemetryLoggingMetrics = telemetryLoggingMetrics

sqlStatsInternalExecutorMonitor := MakeInternalExecutorMemMonitor(MemoryMetrics{}, s.GetExecutorConfig().Settings)
Expand Down Expand Up @@ -1566,6 +1565,13 @@ type connExecutor struct {
// createdSequences keeps track of sequences created in the current transaction.
// The map key is the sequence descpb.ID.
createdSequences map[descpb.ID]struct{}

// shouldLogToTelemetry indicates if the current transaction should be
// logged to telemetry. It is used in telemetry transaction sampling
// mode to emit all statement events for a particular transaction.
// Note that the number of statement events emitted per transaction is
// capped by the cluster setting telemetryStatementsPerTransactionMax.
shouldLogToTelemetry bool
}

// sessionDataStack contains the user-configurable connection variables.
Expand Down Expand Up @@ -2014,7 +2020,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
ex.state.mu.Lock()
defer ex.state.mu.Unlock()
ex.state.mu.stmtCount = 0
tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
}

// NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a
// a ROLLBACK TO SAVEPOINT that generated the event, and that statement deals with the
// savepoints, or it's a rewind which also deals with them.
Expand Down Expand Up @@ -2809,7 +2819,7 @@ func (ex *connExecutor) execCopyOut(
stmtFingerprintID,
&stats,
ex.statsCollector,
)
ex.extraTxnState.shouldLogToTelemetry)
}()

stmtTS := ex.server.cfg.Clock.PhysicalTime()
Expand Down Expand Up @@ -3069,7 +3079,8 @@ func (ex *connExecutor) execCopyIn(
ex.server.TelemetryLoggingMetrics,
stmtFingerprintID,
&stats,
ex.statsCollector)
ex.statsCollector,
ex.extraTxnState.shouldLogToTelemetry)
}()

var copyErr error
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ func (ex *connExecutor) execStmtInOpenState(
stmtFingerprintID,
&topLevelQueryStats{},
ex.statsCollector,
)
ex.extraTxnState.shouldLogToTelemetry)
}()

switch s := ast.(type) {
Expand Down Expand Up @@ -1797,7 +1797,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ppInfo.dispatchToExecutionEngine.stmtFingerprintID,
ppInfo.dispatchToExecutionEngine.queryStats,
ex.statsCollector,
)
ex.extraTxnState.shouldLogToTelemetry)
},
})
} else {
Expand All @@ -1824,7 +1824,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
stmtFingerprintID,
&stats,
ex.statsCollector,
)
ex.extraTxnState.shouldLogToTelemetry)
}
}()

Expand Down Expand Up @@ -2503,7 +2503,7 @@ func (ex *connExecutor) execStmtInNoTxnState(
stmtFingerprintID,
&topLevelQueryStats{},
ex.statsCollector,
)
ex.extraTxnState.shouldLogToTelemetry)
}()

// We're in the NoTxn state, so no statements were executed earlier. Bump the
Expand Down Expand Up @@ -3138,11 +3138,9 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
}
ex.server.ServerMetrics.StatsMetrics.DiscardedStatsCount.Inc(1)
}

// If we have a commitTimestamp, we should use it.
ex.previousTransactionCommitTimestamp.Forward(ev.commitTimestamp)
if telemetryLoggingEnabled.Get(&ex.server.cfg.Settings.SV) {
ex.server.TelemetryLoggingMetrics.onTxnFinish(ev.txnID.String())
}
}
}

Expand All @@ -3163,6 +3161,7 @@ func (ex *connExecutor) onTxnRestart(ctx context.Context) {
if ex.server.cfg.TestingKnobs.BeforeRestart != nil {
ex.server.cfg.TestingKnobs.BeforeRestart(ctx, ex.state.mu.autoRetryReason)
}

}
}

Expand All @@ -3175,6 +3174,10 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID,
})

tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
ex.state.mu.RUnlock()
Expand Down
39 changes: 24 additions & 15 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ const (
executorTypeInternal
)

// shouldForceLogStatement returns true if the statement should be force logged to
// TELEMETRY. Currently the criteria is if the statement is not of type DML and is
// not BEGIN or COMMIT.
func shouldForceLogStatement(ast tree.Statement) bool {
switch ast.StatementTag() {
case "BEGIN", "COMMIT":
return false
default:
return ast.StatementType() != tree.TypeDML
}
}

// vLevel returns the vmodule log level at which logs from the given executor
// should be written to the logs.
func (s executorType) vLevel() log.Level { return log.Level(s) + 2 }
Expand All @@ -132,12 +144,13 @@ func (p *planner) maybeLogStatement(
stmtFingerprintID appstatspb.StmtFingerprintID,
queryStats *topLevelQueryStats,
statsCollector sqlstats.StatsCollector,
shouldLogToTelemetry bool,
) {
p.maybeAuditRoleBasedAuditEvent(ctx, execType)
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter,
rows, stmtCount, bulkJobId, err, queryReceived, hasAdminRoleCache,
telemetryLoggingMetrics, stmtFingerprintID, queryStats, statsCollector,
)
shouldLogToTelemetry)
}

func (p *planner) maybeLogStatementInternal(
Expand All @@ -152,6 +165,7 @@ func (p *planner) maybeLogStatementInternal(
stmtFingerprintID appstatspb.StmtFingerprintID,
topLevelQueryStats *topLevelQueryStats,
statsCollector sqlstats.StatsCollector,
shouldLogToTelemetry bool,
) {
// Note: if you find the code below crashing because p.execCfg == nil,
// do not add a test "if p.execCfg == nil { do nothing }" !
Expand Down Expand Up @@ -282,25 +296,14 @@ func (p *planner) maybeLogStatementInternal(
// the last event emission.
tracingEnabled := telemetryMetrics.isTracing(p.curPlan.instrumentation.Tracing())

isStmtMode := telemetrySamplingMode.Get(&p.execCfg.Settings.SV) == telemetryModeStatement

// Always sample if one of the scenarios is true:
// - on 'statement' sampling and the current statement is not of type DML
// - on 'transaction' sampling mode and the current statement is not of type DML and is not a COMMIT
// - statement is not of type DML and is not BEGIN or COMMIT
// - tracing is enabled for this statement
// - this is a query emitted by our console (application_name starts with `$ internal-console`) and
// the cluster setting to log console queries is enabled
forceLog := (p.stmt.AST.StatementType() != tree.TypeDML &&
(isStmtMode || p.stmt.AST.StatementTag() != "COMMIT")) ||
tracingEnabled || logConsoleQuery

var txnID string
// p.txn can be nil for COPY.
if p.txn != nil {
txnID = p.txn.ID().String()
}
forceLog := shouldForceLogStatement(p.stmt.AST) || tracingEnabled || logConsoleQuery

if telemetryMetrics.shouldEmitStatementLog(telemetryMetrics.timeNow(), txnID, forceLog, stmtCount) {
if telemetryMetrics.shouldEmitStatementLog(shouldLogToTelemetry, stmtCount, forceLog) {
var queryLevelStats execstats.QueryLevelStats
if stats, ok := p.instrumentation.GetQueryLevelStats(); ok {
queryLevelStats = *stats
Expand All @@ -312,6 +315,12 @@ func (p *planner) maybeLogStatementInternal(
indexRecs = append(indexRecs, rec.SQL)
}

var txnID string
// p.txn can be nil for COPY.
if p.txn != nil {
txnID = p.txn.ID().String()
}

phaseTimes := statsCollector.PhaseTimes()

// Collect the statistics.
Expand Down
Loading

0 comments on commit 1dde7c0

Please sign in to comment.