Skip to content

Commit

Permalink
telemetry: log transaction exec events to TELEMETRY
Browse files Browse the repository at this point in the history
This commit sends SampledTransaction events to the telemetry channel.
In "transaction" sampling mode, if a transaction is marked to be logged
to telemetry, we will emit a SampledTranaction event on transaction
end containing transaction level stats. It is expected that if a transaction
event exists in telemetry, its statement events will also have been logged
(with a maximum number according to the setting
sql.telemetry.transaction_sampling.statement_events_per_transaction.max).
Transaction recording for telemetry is decided at the start of transaction
execution (including on restarts), and will not be refreshed for the remainder
of transaction execution.

Closes: cockroachdb#108284

Release note (ops change): Transactions sampled for the telemetry logging
channel will now emit a SampledTransaction event. To sample transactions,
set the cluster setting `sql.telemetry.query_sampling.mode = 'transaction'`
and enable telemetry logging via `sql.telemetry.query_sampling.enabled = true`.
  • Loading branch information
xinhaoz committed Jan 29, 2024
1 parent 037aa31 commit 9407cbb
Show file tree
Hide file tree
Showing 11 changed files with 833 additions and 139 deletions.
12 changes: 9 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,10 @@ type connExecutor struct {
// Note that the number of statement events emitted per transaction is
// capped by the cluster setting telemetryStatementsPerTransactionMax.
shouldLogToTelemetry bool

// telemetrySkippedTxns contains the number of transactions skipped by
// telemetry logging prior to this one.
telemetrySkippedTxns uint64
}

// sessionDataStack contains the user-configurable connection variables.
Expand Down Expand Up @@ -2023,9 +2027,11 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
ex.state.mu.Lock()
defer ex.state.mu.Unlock()
ex.state.mu.stmtCount = 0
tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing,
ex.executorType == executorTypeInternal,
ex.applicationName.Load().(string))
}

// NOTE: on txnRestart we don't need to muck with the savepoints stack. It's either a
Expand Down
18 changes: 15 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3161,9 +3161,11 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
TxnFingerprintID: appstatspb.InvalidTransactionFingerprintID,
})

tracingEnabled := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(tracingEnabled, ex.executorType == executorTypeInternal)
isTracing := ex.planner.ExtendedEvalContext().Tracing.Enabled()
ex.extraTxnState.shouldLogToTelemetry, ex.extraTxnState.telemetrySkippedTxns =
ex.server.TelemetryLoggingMetrics.shouldEmitTransactionLog(isTracing,
ex.executorType == executorTypeInternal,
ex.applicationName.Load().(string))

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
Expand Down Expand Up @@ -3284,6 +3286,16 @@ func (ex *connExecutor) recordTransactionFinish(

ex.maybeRecordRetrySerializableContention(ev.txnID, transactionFingerprintID, txnErr)

if ex.extraTxnState.shouldLogToTelemetry {

ex.planner.logTransaction(ctx,
ex.extraTxnState.txnCounter,
transactionFingerprintID,
&recordedTxnStats,
ex.extraTxnState.telemetrySkippedTxns,
)
}

return ex.statsCollector.RecordTransaction(
ctx,
transactionFingerprintID,
Expand Down
82 changes: 81 additions & 1 deletion pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (p *planner) maybeLogStatementInternal(
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
auditEventsDetected := len(p.curPlan.auditEventBuilders) != 0
logConsoleQuery := telemetryInternalConsoleQueriesEnabled.Get(&p.execCfg.Settings.SV) &&
strings.HasPrefix(p.SessionData().ApplicationName, "$ internal-console")
strings.HasPrefix(p.SessionData().ApplicationName, internalConsoleAppName)

// We only consider non-internal SQL statements for telemetry logging unless
// the telemetryInternalQueriesEnabled is true.
Expand Down Expand Up @@ -438,6 +438,86 @@ func (p *planner) maybeLogStatementInternal(
}
}

// logTransaction records the current transaction to the TELEMETRY channel.
func (p *planner) logTransaction(
ctx context.Context,
txnCounter int,
txnFingerprintID appstatspb.TransactionFingerprintID,
txnStats *sqlstats.RecordedTxnStats,
skippedTransactions uint64,
) {

// Redact error messages.
var execErrStr, retryErr redact.RedactableString
sqlErrState := ""
if txnStats.TxnErr != nil {
execErrStr = redact.Sprint(txnStats.TxnErr)
sqlErrState = pgerror.GetPGCode(txnStats.TxnErr).String()
}

if txnStats.AutoRetryReason != nil {
retryErr = redact.Sprint(txnStats.AutoRetryReason)
}

sampledTxn := getSampledTransaction()
defer releaseSampledTransaction(sampledTxn)

*sampledTxn = eventpb.SampledTransaction{
SkippedTransactions: int64(skippedTransactions),
User: txnStats.SessionData.SessionUser().Normalized(),
ApplicationName: txnStats.SessionData.ApplicationName,
TxnCounter: uint32(txnCounter),
SessionID: txnStats.SessionID.String(),
TransactionID: txnStats.TransactionID.String(),
TransactionFingerprintID: txnFingerprintID,
Committed: txnStats.Committed,
ImplicitTxn: txnStats.ImplicitTxn,
StartTimeUnixNanos: txnStats.StartTime.UnixNano(),
EndTimeUnixNanos: txnStats.EndTime.UnixNano(),
ServiceLatNanos: txnStats.ServiceLatency.Nanoseconds(),
SQLSTATE: sqlErrState,
ErrorText: execErrStr,
NumRetries: txnStats.RetryCount,
LastAutoRetryReason: retryErr,
StatementFingerprintIDs: txnStats.StatementFingerprintIDs,
NumRows: int64(txnStats.RowsAffected),
RetryLatNanos: txnStats.RetryLatency.Nanoseconds(),
CommitLatNanos: txnStats.CommitLatency.Nanoseconds(),
IdleLatNanos: txnStats.IdleLatency.Nanoseconds(),
BytesRead: txnStats.BytesRead,
RowsRead: txnStats.RowsRead,
RowsWritten: txnStats.RowsWritten,
}

if txnStats.CollectedExecStats {
sampledTxn.SampledExecStats = &eventpb.SampledExecStats{
NetworkBytes: txnStats.ExecStats.NetworkBytesSent,
MaxMemUsage: txnStats.ExecStats.MaxMemUsage,
ContentionTime: int64(txnStats.ExecStats.ContentionTime.Seconds()),
NetworkMessages: txnStats.ExecStats.NetworkMessages,
MaxDiskUsage: txnStats.ExecStats.MaxDiskUsage,
CPUSQLNanos: txnStats.ExecStats.CPUTime.Nanoseconds(),
MVCCIteratorStats: eventpb.MVCCIteratorStats{
StepCount: txnStats.ExecStats.MvccSteps,
StepCountInternal: txnStats.ExecStats.MvccStepsInternal,
SeekCount: txnStats.ExecStats.MvccSeeks,
SeekCountInternal: txnStats.ExecStats.MvccSeeksInternal,
BlockBytes: txnStats.ExecStats.MvccBlockBytes,
BlockBytesInCache: txnStats.ExecStats.MvccBlockBytesInCache,
KeyBytes: txnStats.ExecStats.MvccKeyBytes,
ValueBytes: txnStats.ExecStats.MvccValueBytes,
PointCount: txnStats.ExecStats.MvccPointCount,
PointsCoveredByRangeTombstones: txnStats.ExecStats.MvccPointsCoveredByRangeTombstones,
RangeKeyCount: txnStats.ExecStats.MvccRangeKeyCount,
RangeKeyContainedPoints: txnStats.ExecStats.MvccRangeKeyContainedPoints,
RangeKeySkippedPoints: txnStats.ExecStats.MvccRangeKeySkippedPoints,
},
}
}

log.StructuredEvent(ctx, sampledTxn)
}

func (p *planner) logEventsOnlyExternally(ctx context.Context, entries ...logpb.EventPayload) {
// The API contract for logEventsWithOptions() is that it returns
// no error when system.eventlog is not written to.
Expand Down
101 changes: 78 additions & 23 deletions pkg/sql/telemetry_datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sql

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -48,8 +49,9 @@ import (
* - unixSecs: sets the current time used for telemetry log sampling to the given unix time in seconds.
* If omitted, the current time is automatically changed by 0.1 seconds.
* - restartUnixSecs: sets the stub time on txn restarts.
*
* tracing: sets the tracing status to the given value
* - tracing: sets the tracing status to the given value. If omitted, the tracing status is set to false.
* - useRealTracing: if set, the real tracing status is used instead of the stubbed one.
* - user: sets the user for the connection. If omitted, the root user is used.
*
* reset-last-sampled: resets the last sampled time.
*/
Expand All @@ -65,18 +67,30 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
ctx := context.Background()
stmtSpy := logtestutils.NewSampledQueryLogScrubVolatileFields(t)
stmtSpy.AddFilter(func(ev logpb.Entry) bool {
return strings.Contains(ev.Message, appName)
return strings.Contains(ev.Message, appName) || strings.Contains(ev.Message, internalConsoleAppName)
})
cleanup := log.InterceptWith(ctx, stmtSpy)
defer cleanup()

txnsSpy := logtestutils.NewSampledTransactionLogScrubVolatileFields(t)
txnsSpy.AddFilter(func(ev logpb.Entry) bool {
return strings.Contains(ev.Message, appName) || strings.Contains(ev.Message, internalConsoleAppName)
})
cleanupTxnSpy := log.InterceptWith(ctx, txnsSpy)
defer cleanupTxnSpy()

datadriven.Walk(t, datapathutils.TestDataPath(t, "telemetry_logging/logging"), func(t *testing.T, path string) {
stmtSpy.Reset()
txnsSpy.Reset()

st := logtestutils.StubTime{}
st.SetTime(timeutil.FromUnixMicros(0))
sts := logtestutils.StubTracingStatus{}
stubTimeOnRestart := int64(0)
telemetryKnobs := &TelemetryLoggingTestingKnobs{
getTimeNow: st.TimeNow,
getTracingStatus: sts.TracingStatus,
}
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand All @@ -85,10 +99,7 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
st.SetTime(timeutil.FromUnixMicros(stubTimeOnRestart * 1e6))
},
},
TelemetryLoggingKnobs: &TelemetryLoggingTestingKnobs{
getTimeNow: st.TimeNow,
getTracingStatus: sts.TracingStatus,
},
TelemetryLoggingKnobs: telemetryKnobs,
},
},
})
Expand All @@ -97,12 +108,21 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {

telemetryLogging := s.SQLServer().(*Server).TelemetryLoggingMetrics
setupConn := s.SQLConn(t)
_, err := setupConn.Exec("CREATE USER testuser")
require.NoError(t, err)

spiedConnRootUser := s.SQLConn(t)
spiedConnTestUser := s.SQLConn(t, serverutils.User("testuser"))
spiedConn := spiedConnRootUser

spiedConn := s.SQLConn(t)
_, err := spiedConn.Exec("SET application_name = $1", appName)
// Set spied connections to the app name observed by the log spy.
_, err = spiedConn.Exec("SET application_name = $1", appName)
require.NoError(t, err)
_, err = spiedConnTestUser.Exec("SET application_name = $1", appName)
require.NoError(t, err)

datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
spiedConn = spiedConnRootUser
switch d.Cmd {
case "exec-sql":
sts.SetTracingStatus(false)
Expand All @@ -112,13 +132,22 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
}
return ""
case "spy-sql":
logCount := stmtSpy.Count()
stmtLogCount := stmtSpy.Count()
txnLogCount := txnsSpy.Count()
var stubTimeUnixSecs float64
var tracing bool
var tracing, useRealTracing bool

d.MaybeScanArgs(t, "tracing", &tracing)
sts.SetTracingStatus(tracing)

d.MaybeScanArgs(t, "useRealTracing", &useRealTracing)
if useRealTracing {
telemetryKnobs.getTracingStatus = nil
defer func() {
telemetryKnobs.getTracingStatus = sts.TracingStatus
}()
}

// Set stubbed stubbed time if this txn is restarted.
scanned := d.MaybeScanArgs(t, "restartUnixSecs", &stubTimeOnRestart)
if !scanned {
Expand All @@ -136,18 +165,38 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
}
st.SetTime(timeutil.FromUnixMicros(stubTimeMicros))

// Setup the sql user.
user := "root"
d.MaybeScanArgs(t, "user", &user)
switch user {
case "root":
case "testuser":
spiedConn = spiedConnTestUser
}

// Execute query input.
_, err := spiedConn.Exec(d.Input)
var sb strings.Builder

if err != nil {
return err.Error()
sb.WriteString(err.Error())
sb.WriteString("\n")
}

newStmtLogCount := stmtSpy.Count()
sb.WriteString(stmtSpy.GetLastNLogs(newStmtLogCount - stmtLogCount))
if newStmtLogCount > stmtLogCount {
sb.WriteString("\n")
}

// Display any new statement logs have been generated since executing the query.
newLogCount := stmtSpy.Count()
return stmtSpy.GetLastNLogs(newLogCount - logCount)
newTxnLogCount := txnsSpy.Count()
sb.WriteString(txnsSpy.GetLastNLogs(newTxnLogCount - txnLogCount))
return sb.String()
case "reset-last-sampled":
telemetryLogging.resetLastSampledTime()
return ""
case "show-skipped-transactions":
return strconv.FormatUint(telemetryLogging.getSkippedTransactionCount(), 10)
default:
t.Fatal("unknown command")
return ""
Expand Down Expand Up @@ -185,8 +234,9 @@ func TestTelemetryLoggingDataDriven(t *testing.T) {
* shouldEmitTransactionLog: calls shouldEmitTransactionLog with the provided arguments
* Args:
* - unixSecs: stubbed sampling time in unix seconds
* - force: value for the 'force' param
* - isTracing: value for the 'isTracing' param
* - isInternal: value for the 'isInternal' param
* - appName: value for the 'appName' param
*/
func TestTelemetryLoggingDecision(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand All @@ -201,11 +251,12 @@ func TestTelemetryLoggingDecision(t *testing.T) {
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
t.Cleanup(func() {
telemetryLogging.resetLastSampledTime()
telemetryLogging.resetCounters()
})

switch d.Cmd {
case "set-cluster-settings":
var telemLoggingEnabled, internalStmtsEnabled bool
var telemLoggingEnabled, internalStmtsEnabled, consoleQueriesEnabled bool
var samplingMode string
var stmtSampleFreq, txnSampleFreq int
stmtsPerTxnMax := 10
Expand All @@ -215,6 +266,7 @@ func TestTelemetryLoggingDecision(t *testing.T) {
d.MaybeScanArgs(t, "txnSampleFreq", &txnSampleFreq)
d.MaybeScanArgs(t, "stmtsPerTxnMax", &stmtsPerTxnMax)
d.MaybeScanArgs(t, "internalStmtsOn", &internalStmtsEnabled)
d.MaybeScanArgs(t, "telemetryInternalConsoleQueriesEnabled", &consoleQueriesEnabled)

mode := telemetryModeStatement
if samplingMode == "transaction" {
Expand All @@ -226,6 +278,7 @@ func TestTelemetryLoggingDecision(t *testing.T) {
telemetryTransactionSamplingFrequency.Override(ctx, &cs.SV, int64(txnSampleFreq))
telemetryLoggingEnabled.Override(ctx, &cs.SV, telemLoggingEnabled)
telemetryInternalQueriesEnabled.Override(ctx, &cs.SV, internalStmtsEnabled)
telemetryInternalConsoleQueriesEnabled.Override(ctx, &cs.SV, consoleQueriesEnabled)

return ""
case "shouldEmitStatementLog":
Expand All @@ -238,18 +291,20 @@ func TestTelemetryLoggingDecision(t *testing.T) {
d.ScanArgs(t, "force", &force)
st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6)))

shouldEmit, _ := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force)
return strconv.FormatBool(shouldEmit)
shouldEmit, skipped := telemetryLogging.shouldEmitStatementLog(isTrackedTxn, stmtNum, force)
return fmt.Sprintf(`emit: %t, skippedQueries: %d`, shouldEmit, skipped)
case "shouldEmitTransactionLog":
var unixSecs float64
var force, isInternal bool
var isTracing, isInternal bool
var appName string
d.ScanArgs(t, "unixSecs", &unixSecs)
d.ScanArgs(t, "force", &force)
d.ScanArgs(t, "isTracing", &isTracing)
d.MaybeScanArgs(t, "isInternal", &isInternal)
st.SetTime(timeutil.FromUnixMicros(int64(unixSecs * 1e6)))
d.ScanArgs(t, "appName", &appName)

shouldEmit := telemetryLogging.shouldEmitTransactionLog(force, isInternal)
return strconv.FormatBool(shouldEmit)
shouldEmit, skipped := telemetryLogging.shouldEmitTransactionLog(isTracing, isInternal, appName)
return fmt.Sprintf(`emit: %t, skippedTxns: %d`, shouldEmit, skipped)
default:
t.Fatal("unknown command")
return ""
Expand Down
Loading

0 comments on commit 9407cbb

Please sign in to comment.