Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

telemetry: log transaction exec events to TELEMETRY #115722

Merged
merged 4 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3081,6 +3081,7 @@ An event of type `sampled_transaction` is the event logged to telemetry at the e
| `RowsRead` | RowsRead is the number of rows read from disk. | no |
| `RowsWritten` | RowsWritten is the number of rows written to disk. | no |
| `SampledExecStats` | SampledExecStats is a nested field containing execution statistics. This field will be omitted if the stats were not sampled. | yes |
| `SkippedTransactions` | SkippedTransactions is the number of transactions that were skipped as part of sampling prior to this one. We only count skipped transactions when telemetry logging is enabled and the sampling mode is set to "transaction". | no |


#### Common fields
Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,6 +1562,10 @@ type connExecutor struct {
// Note that the number of statement events emitted per transaction is
// capped by the cluster setting telemetryStatementsPerTransactionMax.
shouldLogToTelemetry bool

// telemetrySkippedTxns contains the number of transactions skipped by
// telemetry logging prior to this one.
telemetrySkippedTxns uint64
}

// sessionDataStack contains the user-configurable connection variables.
Expand Down Expand Up @@ -2010,9 +2014,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
ex.state.mu.Lock()
defer ex.state.mu.Unlock()
ex.state.mu.stmtCount = 0
tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing,
ex.executorType == executorTypeInternal,
ex.applicationName.Load().(string))
}

// NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3182,9 +3182,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID,
})

tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing,
ex.executorType == executorTypeInternal,
ex.applicationName.Load().(string))

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
Expand Down Expand Up @@ -3305,6 +3307,16 @@ func (ex *connExecutor) recordTransactionFinish(

ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr)

if ex.extraTxnState.shouldLogToTelemetry {

ex.planner.logTransaction(ctx,
int(ex.extraTxnState.txnCounter.Load()),
transactionFingerprintID,
&recordedTxnStats,
ex.extraTxnState.telemetrySkippedTxns,
)
}

return ex.statsCollector.RecordTransaction(
ctx,
transactionFingerprintID,
Expand Down
89 changes: 86 additions & 3 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *planner) maybeLogStatementInternal(
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
auditEventsDetected := len(p.curPlan.auditEventBuilders) != 0
logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&p.execCfg.Settings.SV) &&
strings.HasPrefix(p.SessionData().ApplicationName, "$ internal-console")
strings.HasPrefix(p.SessionData().ApplicationName, internalConsoleAppName)

// We only consider non-internal SQL statements for telemetry logging unless
// the telemetryInternalQueriesEnabled is true.
Expand Down Expand Up @@ -355,7 +355,10 @@ func (p *planner) maybeLogStatementInternal(
})
}

sampledQuery := eventpb.SampledQuery{
sampledQuery := getSampledQuery()
defer releaseSampledQuery(sampledQuery)

*sampledQuery = eventpb.SampledQuery{
CommonSQLExecDetails: execDetails,
SkippedQueries: skippedQueries,
CostEstimate: p.curPlan.instrumentation.costEstimate,
Expand Down Expand Up @@ -431,10 +434,90 @@ func (p *planner) maybeLogStatementInternal(
SchemaChangerMode: p.curPlan.instrumentation.schemaChangerMode.String(),
}

p.logOperationalEventsOnlyExternally(ctx, &sampledQuery)
p.logOperationalEventsOnlyExternally(ctx, sampledQuery)
}
}

// logTransaction records the current transaction to the TELEMETRY channel.
func (p *planner) logTransaction(
ctx context.Context,
txnCounter int,
txnFingerprintID appstatspb.TransactionFingerprintID,
txnStats *sqlstats.RecordedTxnStats,
skippedTransactions uint64,
) {

// Redact error messages.
var execErrStr, retryErr redact.RedactableString
sqlErrState := ""
if txnStats.TxnErr != nil {
execErrStr = redact.Sprint(txnStats.TxnErr)
sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String()
}

if txnStats.AutoRetryReason != nil {
retryErr = redact.Sprint(txnStats.AutoRetryReason)
}

sampledTxn := getSampledTransaction()
defer releaseSampledTransaction(sampledTxn)

*sampledTxn = eventpb.SampledTransaction{
SkippedTransactions: int64(skippedTransactions),
User: txnStats.SessionData.SessionUser().Normalized(),
ApplicationName: txnStats.SessionData.ApplicationName,
TxnCounter: uint32(txnCounter),
SessionID: txnStats.SessionID.String(),
TransactionID: txnStats.TransactionID.String(),
TransactionFingerprintID: txnFingerprintID,
Committed: txnStats.Committed,
ImplicitTxn: txnStats.ImplicitTxn,
StartTimeUnixNanos: txnStats.StartTime.UnixNano(),
EndTimeUnixNanos: txnStats.EndTime.UnixNano(),
ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(),
SQLSTATE: sqlErrState,
ErrorText: execErrStr,
NumRetries: txnStats.RetryCount,
LastAutoRetryReason: retryErr,
StatementFingerprintIDs: txnStats.StatementFingerprintIDs,
NumRows: int64(txnStats.RowsAffected),
RetryLatNanos: txnStats.RetryLatency.Nanoseconds(),
CommitLatNanos: txnStats.CommitLatency.Nanoseconds(),
IdleLatNanos: txnStats.IdleLatency.Nanoseconds(),
BytesRead: txnStats.BytesRead,
RowsRead: txnStats.RowsRead,
RowsWritten: txnStats.RowsWritten,
}

if txnStats.CollectedExecStats {
sampledTxn.SampledExecStats = &eventpb.SampledExecStats{
NetworkBytes: txnStats.ExecStats.NetworkBytesSent,
MaxMemUsage: txnStats.ExecStats.MaxMemUsage,
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
NetworkMessages: txnStats.ExecStats.NetworkMessages,
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
MVCCIteratorStats: eventpb.MVCCIteratorStats{
StepCount: txnStats.ExecStats.MvccSteps,
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,
SeekCount: txnStats.ExecStats.MvccSeeks,
SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal,
BlockBytes: txnStats.ExecStats.MvccBlockBytes,
BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache,
KeyBytes: txnStats.ExecStats.MvccKeyBytes,
ValueBytes: txnStats.ExecStats.MvccValueBytes,
PointCount: txnStats.ExecStats.MvccPointCount,
PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones,
RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount,
RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints,
RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints,
},
}
}

log.StructuredEvent(ctx, sampledTxn)
}

func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
Expand Down
Loading
Loading