Skip to content

Commit

Permalink
telemetry: log transaction exec events to TELEMETRY
Browse files Browse the repository at this point in the history
This commit sends SampledTransaction events to the telemetry channel.
In "transaction" sampling mode, if a transaction is marked to be logged
to telemetry, we will emit a SampledTranaction event on transaction
end containing transaction level stats. It is expected that if a transaction
event exists in telemetry, its statement events will also have been logged
(with a maximum number according to the setting
sql.telemetry.transaction_sampling.statement_events_per_transaction.max).

Closes: cockroachdb#108284

Release note (ops change): Transactions sampled for the telemetry logging
channel will now emit a SampledTransaction event. To sample transactions,
set the cluster setting `sql.telemetry.query_sampling.mode = 'transaction'`
and enable telemetry logging via `sql.telemetry.query_sampling.enabled = true`.
  • Loading branch information
xinhaoz committed Jan 19, 2024
1 parent d5b8f06 commit b0045ce
Show file tree
Hide file tree
Showing 11 changed files with 603 additions and 95 deletions.
12 changes: 9 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,10 @@ type connExecutor struct {
// Note that the number of statement events emitted per transaction is
// capped by the cluster setting telemetryStatementsPerTransactionMax.
shouldLogToTelemetry bool

// telemetrySkippedTxns contains the number of transactions skipped by
// telemetry logging prior to this one.
telemetrySkippedTxns uint64
}

// sessionDataStack contains the user-configurable connection variables.
Expand Down Expand Up @@ -2020,9 +2024,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
ex.state.mu.Lock()
defer ex.state.mu.Unlock()
ex.state.mu.stmtCount = 0
tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
forceSampling := ex.server.TelemetryLoggingMetrics.shouldForceTxnSampling(
ex.applicationName.Load().(string),
ex.planner.ExtendedEvalContext().Tracing.Enabled())
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(forceSampling, ex.executorType == executorTypeInternal)
}

// NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3175,9 +3175,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID,
})

tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
forceSampling := ex.server.TelemetryLoggingMetrics.shouldForceTxnSampling(
ex.applicationName.Load().(string),
ex.planner.ExtendedEvalContext().Tracing.Enabled())
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(forceSampling, ex.executorType == executorTypeInternal)

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
Expand Down Expand Up @@ -3298,6 +3300,16 @@ func (ex *connExecutor) recordTransactionFinish(

ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr)

ex.planner.maybeLogTransaction(ctx,
ex.executorType,
int(ex.extraTxnState.txnCounter.Load()),
transactionFingerprintID,
&recordedTxnStats,
ex.server.TelemetryLoggingMetrics,
ex.extraTxnState.shouldLogToTelemetry,
ex.extraTxnState.telemetrySkippedTxns,
)

return ex.statsCollector.RecordTransaction(
ctx,
transactionFingerprintID,
Expand Down
97 changes: 97 additions & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,103 @@ func (p *planner) maybeLogStatementInternal(
}
}

// maybeLogTransaction conditionally records the current transaction
// to the TELEMETRY channel.
func (p *planner) maybeLogTransaction(
ctx context.Context,
execType executorType,
txnCounter int,
txnFingerprintID appstatspb.TransactionFingerprintID,
txnStats *sqlstats.RecordedTxnStats,
telemetryLoggingMetrics *telemetryLoggingMetrics,
trackedForTelemetry bool,
skippedTransactions uint64,
) {
isTxnTelemetryMode := telemetrySamplingMode.Get(&p.execCfg.Settings.SV) == telemetryModeTransaction

// Exit if transaction telemetry logging is not enabled for this statement type.
// Note that this prevents us from incrementing the skipped transaction counter, which
// should only be incremented when telemetry is enabled.
if !telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) || !isTxnTelemetryMode ||
(execType == executorTypeInternal && !telemetryInternalQueriesEnabled.Get(&p.execCfg.Settings.SV)) {
return
}

if !trackedForTelemetry {
return
}

// Redact error messages.
var execErrStr, retryErr redact.RedactableString
sqlErrState := ""
if txnStats.TxnErr != nil {
execErrStr = redact.Sprint(txnStats.TxnErr)
sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String()
}

if txnStats.AutoRetryReason != nil {
retryErr = redact.Sprint(txnStats.AutoRetryReason)
}

sampledTxn := getSampledTransaction()
defer releaseSampledTransaction(sampledTxn)

*sampledTxn = eventpb.SampledTransaction{
SkippedTransactions: int64(skippedTransactions),
User: txnStats.SessionData.SessionUser().Normalized(),
ApplicationName: txnStats.SessionData.ApplicationName,
TxnCounter: uint32(txnCounter),
SessionID: txnStats.SessionID.String(),
TransactionID: txnStats.TransactionID.String(),
TransactionFingerprintID: txnFingerprintID,
Committed: txnStats.Committed,
ImplicitTxn: txnStats.ImplicitTxn,
StartTimeUnixNanos: txnStats.StartTime.UnixNano(),
EndTimeUnixNanos: txnStats.EndTime.UnixNano(),
ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(),
SQLSTATE: sqlErrState,
ErrorText: execErrStr,
NumRetries: txnStats.RetryCount,
LastAutoRetryReason: retryErr,
StatementFingerprintIDs: txnStats.StatementFingerprintIDs,
NumRows: int64(txnStats.RowsAffected),
RetryLatNanos: txnStats.RetryLatency.Nanoseconds(),
CommitLatNanos: txnStats.CommitLatency.Nanoseconds(),
IdleLatNanos: txnStats.IdleLatency.Nanoseconds(),
BytesRead: txnStats.BytesRead,
RowsRead: txnStats.RowsRead,
RowsWritten: txnStats.RowsWritten,
}

if txnStats.CollectedExecStats {
sampledTxn.SampledExecStats = &eventpb.SampledExecStats{
NetworkBytes: txnStats.ExecStats.NetworkBytesSent,
MaxMemUsage: txnStats.ExecStats.MaxMemUsage,
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
NetworkMessages: txnStats.ExecStats.NetworkMessages,
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
MVCCIteratorStats: eventpb.MVCCIteratorStats{
StepCount: txnStats.ExecStats.MvccSteps,
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,
SeekCount: txnStats.ExecStats.MvccSeeks,
SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal,
BlockBytes: txnStats.ExecStats.MvccBlockBytes,
BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache,
KeyBytes: txnStats.ExecStats.MvccKeyBytes,
ValueBytes: txnStats.ExecStats.MvccValueBytes,
PointCount: txnStats.ExecStats.MvccPointCount,
PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones,
RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount,
RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints,
RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints,
},
}
}

log.StructuredEvent(ctx, sampledTxn)
}

func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
Expand Down
61 changes: 50 additions & 11 deletions pkg/sql/telemetry_datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sql

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -70,6 +71,13 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
cleanup := log.InterceptWith(ctx, stmtSpy)
defer cleanup()

txnsSpy := logtestutils.NewSampledTransactionLogScrubVolatileFields(t)
txnsSpy.AddFilter(func(ev logpb.Entry) bool {
return strings.Contains(ev.Message, appName)
})
cleanupTxnSpy := log.InterceptWith(ctx, txnsSpy)
defer cleanupTxnSpy()

datadriven.Walk(t, datapathutils.TestDataPath(t, "telemetry_logging/logging"), func(t *testing.T, path string) {
stmtSpy.Reset()

Expand Down Expand Up @@ -97,12 +105,21 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {

telemetryLogging := s.SQLServer().(*Server).TelemetryLoggingMetrics
setupConn := s.SQLConn(t)
_, err := setupConn.Exec("CREATE USER testuser")
require.NoError(t, err)

spiedConnRootUser := s.SQLConn(t)
spiedConnTestUser := s.SQLConn(t, serverutils.User("testuser"))
spiedConn := spiedConnRootUser

spiedConn := s.SQLConn(t)
_, err := spiedConn.Exec("SET application_name = $1", appName)
// Set spied connections to the app name observed by the log spy.
_, err = spiedConn.Exec("SET application_name = $1", appName)
require.NoError(t, err)
_, err = spiedConnTestUser.Exec("SET application_name = $1", appName)
require.NoError(t, err)

datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
spiedConn = spiedConnRootUser
switch d.Cmd {
case "exec-sql":
sts.SetTracingStatus(false)
Expand All @@ -112,7 +129,8 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
}
return ""
case "spy-sql":
logCount := stmtSpy.Count()
stmtLogCount := stmtSpy.Count()
txnLogCount := txnsSpy.Count()
var stubTimeUnixSecs float64
var tracing bool

Expand All @@ -136,18 +154,38 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
}
st.SetTime(timeutil.FromUnixMicros(stubTimeMicros))

// Setup the sql user.
user := "root"
d.MaybeScanArgs(t, "user", &user)
switch user {
case "root":
case "testuser":
spiedConn = spiedConnTestUser
}

// Execute query input.
_, err := spiedConn.Exec(d.Input)
var sb strings.Builder

if err != nil {
return err.Error()
sb.WriteString(err.Error())
sb.WriteString("\n")
}

newStmtLogCount := stmtSpy.Count()
sb.WriteString(stmtSpy.GetLastNLogs(newStmtLogCount - stmtLogCount))
if newStmtLogCount > stmtLogCount {
sb.WriteString("\n")
}

// Display any new statement logs have been generated since executing the query.
newLogCount := stmtSpy.Count()
return stmtSpy.GetLastNLogs(newLogCount - logCount)
newTxnLogCount := txnsSpy.Count()
sb.WriteString(txnsSpy.GetLastNLogs(newTxnLogCount - txnLogCount))
return sb.String()
case "reset-last-sampled":
telemetryLogging.resetLastSampledTime()
return ""
case "show-skipped-transactions":
return strconv.FormatUint(telemetryLogging.getSkippedTransactionCount(), 10)
default:
t.Fatal("unknown command")
return ""
Expand Down Expand Up @@ -201,6 +239,7 @@ func TestTelemetryLoggingDecision(t *testing.T) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
t.Cleanup(func() {
telemetryLogging.resetLastSampledTime()
telemetryLogging.resetCounters()
})

switch d.Cmd {
Expand Down Expand Up @@ -238,8 +277,8 @@ func TestTelemetryLoggingDecision(t *testing.T) {
d.ScanArgs(t, "force", &force)
st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6)))

shouldEmit, _ := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force)
return strconv.FormatBool(shouldEmit)
shouldEmit, skipped := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force)
return fmt.Sprintf(`emit: %t, skippedQueries: %d`, shouldEmit, skipped)
case "shouldEmitTransactionLog":
var unixSecs float64
var force, isInternal bool
Expand All @@ -248,8 +287,8 @@ func TestTelemetryLoggingDecision(t *testing.T) {
d.MaybeScanArgs(t, "isInternal", &isInternal)
st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6)))

shouldEmit := telemetryLogging.shouldEmitTransactionLog(force, isInternal)
return strconv.FormatBool(shouldEmit)
shouldEmit, skipped := telemetryLogging.shouldEmitTransactionLog(force, isInternal)
return fmt.Sprintf(`emit: %t, skippedTxns: %d`, shouldEmit, skipped)
default:
t.Fatal("unknown command")
return ""
Expand Down
Loading

0 comments on commit b0045ce

Please sign in to comment.