Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
115722: telemetry: log transaction exec events to TELEMETRY r=xinhaoz a=xinhaoz

### 1. [sql/telemetry: add SkippedTransactions to SampledTransaction proto](a356e63) 

This commit adds the field SkippedTransactions to the
SampledTransaction protobuf to count the number of transactions
that were not sampled while telemetry transaction logging
is enabled. The corresponding field is added to the
telemetryLogging struct and will be used in the following
commit to track skipped transactions. Some whitespace in the
SampledTransaction proto definition is adjusted.

Epic: none

Release note (sql change): New field `SkippedTransactions` in
the SampledTransaction event, which is emitted to the TELEMETRY
logging channel when telemetry logging is enabled and set to
"transaction" mode.

### 2. [eventpb: make MVCCIteratorStats in SampledExecStats non-nullable](6610467) 

This field should always exist in SampledExecStats. Since
SampledTransaction is the only user of this message right now
and is yet to be used we can safely change the proto definition.

Epic: none

Release note: None

### 3. [eventpb: make MVCCIteratorStats in SampledExecStats non-nullable](6610467) 

This field should always exist in SampledExecStats. Since
SampledTransaction is the only user of this message right now
and is yet to be used we can safely change the proto definition.

Epic: none

Release note: None

### 4.  [telemetry: log transaction exec events to TELEMETRY](cc0d1bb)

This commit sends SampledTransaction events to the telemetry channel.
In "transaction" sampling mode, if a transaction is marked to be logged
to telemetry, we will emit a SampledTranaction event on transaction
end containing transaction level stats. It is expected that if a transaction
event exists in telemetry, its statement events will also have been logged
(with a maximum number according to the setting
sql.telemetry.transaction_sampling.statement_events_per_transaction.max.

Closes: #108284

Release note (ops change): Transactions sampled for the telemetry logging
channel will now emit a SampledTransaction event. To sample transactions,
set the cluster setting `sql.telemetry.query_sampling.mode = 'transaction'`
and enable telemetry logging via `sql.telemetry.query_sampling.enabled = true`.

118325: datapathutils: update comment for `DebuggableTempDir` r=rail a=rickystewart

Improve some of the wording here. Also I accidentally wrote "temp" instead of "test" which is confusing.

Epic: none
Release note: None

Co-authored-by: Xin Hao Zhang <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
3 people committed Jan 26, 2024
3 parents 2ab11ef + 9199146 + bbc19da commit e3274e3
Show file tree
Hide file tree
Showing 16 changed files with 925 additions and 172 deletions.
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3081,6 +3081,7 @@ An event of type `sampled_transaction` is the event logged to telemetry at the e
| `RowsRead` | RowsRead is the number of rows read from disk. | no |
| `RowsWritten` | RowsWritten is the number of rows written to disk. | no |
| `SampledExecStats` | SampledExecStats is a nested field containing execution statistics. This field will be omitted if the stats were not sampled. | yes |
| `SkippedTransactions` | SkippedTransactions is the number of transactions that were skipped as part of sampling prior to this one. We only count skipped transactions when telemetry logging is enabled and the sampling mode is set to "transaction". | no |


#### Common fields
Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,6 +1562,10 @@ type connExecutor struct {
// Note that the number of statement events emitted per transaction is
// capped by the cluster setting telemetryStatementsPerTransactionMax.
shouldLogToTelemetry bool

// telemetrySkippedTxns contains the number of transactions skipped by
// telemetry logging prior to this one.
telemetrySkippedTxns uint64
}

// sessionDataStack contains the user-configurable connection variables.
Expand Down Expand Up @@ -2010,9 +2014,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
ex.state.mu.Lock()
defer ex.state.mu.Unlock()
ex.state.mu.stmtCount = 0
tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing,
ex.executorType == executorTypeInternal,
ex.applicationName.Load().(string))
}

// NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3182,9 +3182,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID,
})

tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing,
ex.executorType == executorTypeInternal,
ex.applicationName.Load().(string))

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
Expand Down Expand Up @@ -3305,6 +3307,16 @@ func (ex *connExecutor) recordTransactionFinish(

ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr)

if ex.extraTxnState.shouldLogToTelemetry {

ex.planner.logTransaction(ctx,
int(ex.extraTxnState.txnCounter.Load()),
transactionFingerprintID,
&recordedTxnStats,
ex.extraTxnState.telemetrySkippedTxns,
)
}

return ex.statsCollector.RecordTransaction(
ctx,
transactionFingerprintID,
Expand Down
89 changes: 86 additions & 3 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *planner) maybeLogStatementInternal(
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
auditEventsDetected := len(p.curPlan.auditEventBuilders) != 0
logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&p.execCfg.Settings.SV) &&
strings.HasPrefix(p.SessionData().ApplicationName, "$ internal-console")
strings.HasPrefix(p.SessionData().ApplicationName, internalConsoleAppName)

// We only consider non-internal SQL statements for telemetry logging unless
// the telemetryInternalQueriesEnabled is true.
Expand Down Expand Up @@ -355,7 +355,10 @@ func (p *planner) maybeLogStatementInternal(
})
}

sampledQuery := eventpb.SampledQuery{
sampledQuery := getSampledQuery()
defer releaseSampledQuery(sampledQuery)

*sampledQuery = eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
CostEstimate: p.curPlan.instrumentation.costEstimate,
Expand Down Expand Up @@ -431,10 +434,90 @@ func (p *planner) maybeLogStatementInternal(
SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(),
}

p.logOperationalEventsOnlyExternally(ctx, &sampledQuery)
p.logOperationalEventsOnlyExternally(ctx, sampledQuery)
}
}

// logTransaction records the current transaction to the TELEMETRY channel.
func (p *planner) logTransaction(
ctx context.Context,
txnCounter int,
txnFingerprintID appstatspb.TransactionFingerprintID,
txnStats *sqlstats.RecordedTxnStats,
skippedTransactions uint64,
) {

// Redact error messages.
var execErrStr, retryErr redact.RedactableString
sqlErrState := ""
if txnStats.TxnErr != nil {
execErrStr = redact.Sprint(txnStats.TxnErr)
sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String()
}

if txnStats.AutoRetryReason != nil {
retryErr = redact.Sprint(txnStats.AutoRetryReason)
}

sampledTxn := getSampledTransaction()
defer releaseSampledTransaction(sampledTxn)

*sampledTxn = eventpb.SampledTransaction{
SkippedTransactions: int64(skippedTransactions),
User: txnStats.SessionData.SessionUser().Normalized(),
ApplicationName: txnStats.SessionData.ApplicationName,
TxnCounter: uint32(txnCounter),
SessionID: txnStats.SessionID.String(),
TransactionID: txnStats.TransactionID.String(),
TransactionFingerprintID: txnFingerprintID,
Committed: txnStats.Committed,
ImplicitTxn: txnStats.ImplicitTxn,
StartTimeUnixNanos: txnStats.StartTime.UnixNano(),
EndTimeUnixNanos: txnStats.EndTime.UnixNano(),
ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(),
SQLSTATE: sqlErrState,
ErrorText: execErrStr,
NumRetries: txnStats.RetryCount,
LastAutoRetryReason: retryErr,
StatementFingerprintIDs: txnStats.StatementFingerprintIDs,
NumRows: int64(txnStats.RowsAffected),
RetryLatNanos: txnStats.RetryLatency.Nanoseconds(),
CommitLatNanos: txnStats.CommitLatency.Nanoseconds(),
IdleLatNanos: txnStats.IdleLatency.Nanoseconds(),
BytesRead: txnStats.BytesRead,
RowsRead: txnStats.RowsRead,
RowsWritten: txnStats.RowsWritten,
}

if txnStats.CollectedExecStats {
sampledTxn.SampledExecStats = &eventpb.SampledExecStats{
NetworkBytes: txnStats.ExecStats.NetworkBytesSent,
MaxMemUsage: txnStats.ExecStats.MaxMemUsage,
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
NetworkMessages: txnStats.ExecStats.NetworkMessages,
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
MVCCIteratorStats: eventpb.MVCCIteratorStats{
StepCount: txnStats.ExecStats.MvccSteps,
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,
SeekCount: txnStats.ExecStats.MvccSeeks,
SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal,
BlockBytes: txnStats.ExecStats.MvccBlockBytes,
BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache,
KeyBytes: txnStats.ExecStats.MvccKeyBytes,
ValueBytes: txnStats.ExecStats.MvccValueBytes,
PointCount: txnStats.ExecStats.MvccPointCount,
PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones,
RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount,
RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints,
RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints,
},
}
}

log.StructuredEvent(ctx, sampledTxn)
}

func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
Expand Down
Loading

0 comments on commit e3274e3

Please sign in to comment.