Skip to content

Commit

Permalink
telemetry: allow BEGIN statements to be logged in telemetry transacti…
Browse files Browse the repository at this point in the history
…on mode

Previously, we dropped loggingBEGIN statements in telemetry transaction
sampling mode due to BEGIn not having an associated transaction execution
id at the time of logging. For transaction sampling we were using the
transaction execution id to track the transaction through its execution
in order to log all of its statements. Since BEGIN statements did not have
an id, we could not start tracking with BEGIN. This commit enables us to
include BEGIN statements by using a combination of the session id and
session txn counter as the tracking id for telemetry, instead of the
execution id. This allows us to start tracking transactions at BEGIN instead
of at the statement after it.

Part of: cockroachdb#108284

Release note (sql change): Telemetry logging - "transaction" sampling
mode will now log BEGIN statements when they are present in a sampled
transaction.
  • Loading branch information
xinhaoz committed Dec 5, 2023
1 parent 989c37c commit a941c03
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 38 deletions.
4 changes: 3 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3100,7 +3100,9 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent, txnErr err
// If we have a commitTimestamp, we should use it.
ex.previousTransactionCommitTimestamp.Forward(ev.commitTimestamp)
if telemetryLoggingEnabled.Get(&ex.server.cfg.Settings.SV) {
ex.server.TelemetryLoggingMetrics.onTxnFinish(ev.txnID.String())
txnCounter := int(ex.extraTxnState.txnCounter.Load())
txnKey := createTelemetryTransactionID(ex.planner.extendedEvalCtx.SessionID, txnCounter)
ex.server.TelemetryLoggingMetrics.onTxnFinish(txnKey)
}
}
}
Expand Down
42 changes: 29 additions & 13 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ const (
executorTypeInternal
)

// shouldForceLogStatementType returns true if the statement should be force logged to
// TELEMETRY. Currently the criteria is if the statement is not of type DML and is
// not BEGIN or COMMIT.
func shouldForceLogStatementType(stmtType tree.StatementType, stmtTag string) bool {
switch stmtTag {
case "BEGIN", "COMMIT":
return false
default:
return stmtType != tree.TypeDML
}
}

// vLevel returns the vmodule log level at which logs from the given executor
// should be written to the logs.
func (s executorType) vLevel() log.Level { return log.Level(s) + 2 }
Expand Down Expand Up @@ -206,6 +218,12 @@ func (p *planner) maybeLogStatementInternal(
sqlErrState = pgerror.GetPGCode(err).String()
}

// See #115610. The txn counter for BEGIN is off by 1.
// We can increment it manually here until the issue is resolved.
if p.stmt.AST.StatementTag() == "BEGIN" {
txnCounter++
}

execDetails := eventpb.CommonSQLExecDetails{
// Note: the current statement, application name, etc, are
// automatically populated by the shared logic in event_log.go.
Expand Down Expand Up @@ -282,25 +300,17 @@ func (p *planner) maybeLogStatementInternal(
// the last event emission.
tracingEnabled := telemetryMetrics.isTracing(p.curPlan.instrumentation.Tracing())

isStmtMode := telemetrySamplingMode.Get(&p.execCfg.Settings.SV) == telemetryModeStatement

// Always sample if one of the scenarios is true:
// - on 'statement' sampling and the current statement is not of type DML
// - on 'transaction' sampling mode and the current statement is not of type DML and is not a COMMIT
// - statement is not of type DML and is not BEGIN or COMMIT
// - tracing is enabled for this statement
// - this is a query emitted by our console (application_name starts with `$ internal-console`) and
// the cluster setting to log console queries is enabled
forceLog := (p.stmt.AST.StatementType() != tree.TypeDML &&
(isStmtMode || p.stmt.AST.StatementTag() != "COMMIT")) ||
forceLog := shouldForceLogStatementType(p.stmt.AST.StatementType(), p.stmt.AST.StatementTag()) ||
tracingEnabled || logConsoleQuery

var txnID string
// p.txn can be nil for COPY.
if p.txn != nil {
txnID = p.txn.ID().String()
}

if telemetryMetrics.shouldEmitStatementLog(telemetryMetrics.timeNow(), txnID, forceLog, stmtCount) {
txnTrackingID := createTelemetryTransactionID(p.extendedEvalCtx.SessionID, txnCounter)
isFirstStmt := stmtCount == 0 || (p.extendedEvalCtx.Context.TxnImplicit && stmtCount == 1)
if telemetryMetrics.shouldEmitStatementLog(telemetryMetrics.timeNow(), txnTrackingID, forceLog, isFirstStmt) {
var queryLevelStats execstats.QueryLevelStats
if stats, ok := p.instrumentation.GetQueryLevelStats(); ok {
queryLevelStats = *stats
Expand All @@ -312,6 +322,12 @@ func (p *planner) maybeLogStatementInternal(
indexRecs = append(indexRecs, rec.SQL)
}

var txnID string
// p.txn can be nil for COPY.
if p.txn != nil {
txnID = p.txn.ID().String()
}

phaseTimes := statsCollector.PhaseTimes()

// Collect the statistics.
Expand Down
44 changes: 25 additions & 19 deletions pkg/sql/telemetry_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ package sql

import (
"context"
"strconv"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -86,6 +88,18 @@ var telemetryTrackedTxnsLimit = settings.RegisterIntSetting(
settings.WithPublic,
)

// We need to track the transaction execution for transaction telemetry logging.
// We can't use the transaction execution id since it is not available for BEGIN
// statements, so we use the session id and txn counter to track the transaction through
// its execution.
type telemetryTransactionID string

func createTelemetryTransactionID(
sessionID clusterunique.ID, txnCounter int,
) telemetryTransactionID {
return telemetryTransactionID(sessionID.String() + "/" + strconv.Itoa(txnCounter))
}

// TelemetryLoggingMetrics keeps track of the last time at which an event
// was logged to the telemetry channel, and the number of skipped queries
// since the last logged event.
Expand All @@ -101,7 +115,7 @@ type TelemetryLoggingMetrics struct {
// being logged. When the sampling mode is set to txns, we must ensure we
// log all stmts for a txn. Txns are removed upon completing execution when
// all events have been captured.
observedTxnExecutions map[string]interface{}
observedTxnExecutions map[telemetryTransactionID]interface{}
}

Knobs *TelemetryLoggingTestingKnobs
Expand All @@ -114,7 +128,7 @@ func newTelemetryLoggingMetrics(
knobs *TelemetryLoggingTestingKnobs, st *cluster.Settings,
) *TelemetryLoggingMetrics {
t := TelemetryLoggingMetrics{Knobs: knobs, st: st}
t.mu.observedTxnExecutions = make(map[string]interface{})
t.mu.observedTxnExecutions = make(map[telemetryTransactionID]interface{})
return &t
}

Expand Down Expand Up @@ -154,12 +168,12 @@ func (t *TelemetryLoggingMetrics) registerOnTelemetrySamplingModeChange(
defer t.mu.Unlock()
if mode == telemetryModeStatement {
// Clear currently observed txns.
t.mu.observedTxnExecutions = make(map[string]interface{})
t.mu.observedTxnExecutions = make(map[telemetryTransactionID]interface{})
}
})
}

func (t *TelemetryLoggingMetrics) onTxnFinish(txnExecutionID string) {
func (t *TelemetryLoggingMetrics) onTxnFinish(txnID telemetryTransactionID) {
if telemetrySamplingMode.Get(&t.st.SV) != telemetryModeTransaction {
return
}
Expand All @@ -169,7 +183,7 @@ func (t *TelemetryLoggingMetrics) onTxnFinish(txnExecutionID string) {
func() {
t.mu.RLock()
defer t.mu.RUnlock()
_, exists = t.mu.observedTxnExecutions[txnExecutionID]
_, exists = t.mu.observedTxnExecutions[txnID]
}()

if !exists {
Expand All @@ -178,7 +192,7 @@ func (t *TelemetryLoggingMetrics) onTxnFinish(txnExecutionID string) {

t.mu.Lock()
defer t.mu.Unlock()
delete(t.mu.observedTxnExecutions, txnExecutionID)
delete(t.mu.observedTxnExecutions, txnID)
}

func (t *TelemetryLoggingMetrics) getTrackedTxnsCount() int {
Expand Down Expand Up @@ -206,31 +220,23 @@ func (t *TelemetryLoggingMetrics) timeNow() time.Time {
// - The telemetry mode is set to "statement" AND the required amount of time has elapsed
// - The txn is not being tracked and the stmt is being forced to log.
func (t *TelemetryLoggingMetrics) shouldEmitStatementLog(
newTime time.Time, txnExecutionID string, force bool, stmtPosInTxn int,
newTime time.Time, txnID telemetryTransactionID, force bool, isFirstStmt bool,
) (shouldEmit bool) {
maxEventFrequency := TelemetryMaxStatementEventFrequency.Get(&t.st.SV)
requiredTimeElapsed := time.Second / time.Duration(maxEventFrequency)
isTxnMode := telemetrySamplingMode.Get(&t.st.SV) == telemetryModeTransaction
txnsLimit := int(telemetryTrackedTxnsLimit.Get(&t.st.SV))

if isTxnMode && txnExecutionID == "" {
// If we are in transaction mode, skip logging statements without txn ids
// since we won't be able to track the stmt's txn through its execution.
// This will skip statements like BEGIN which don't have an associated
// transaction id.
return false
}

var enoughTimeElapsed, txnIsTracked, startTrackingTxn bool
// Avoid taking the full lock if we don't have to.
func() {
t.mu.RLock()
defer t.mu.RUnlock()

enoughTimeElapsed = newTime.Sub(t.mu.lastEmittedTime) >= requiredTimeElapsed
startTrackingTxn = isTxnMode && txnExecutionID != "" &&
stmtPosInTxn == 1 && len(t.mu.observedTxnExecutions) < txnsLimit
_, txnIsTracked = t.mu.observedTxnExecutions[txnExecutionID]
startTrackingTxn = isTxnMode && txnID != "" &&
isFirstStmt && len(t.mu.observedTxnExecutions) < txnsLimit
_, txnIsTracked = t.mu.observedTxnExecutions[txnID]
}()

if txnIsTracked || (!force && (!enoughTimeElapsed || (isTxnMode && !startTrackingTxn))) {
Expand All @@ -253,7 +259,7 @@ func (t *TelemetryLoggingMetrics) shouldEmitStatementLog(

// We could be forcing the log so we should only track its txn if it meets the criteria.
if startTrackingTxn && !txnLimitReached {
t.mu.observedTxnExecutions[txnExecutionID] = struct{}{}
t.mu.observedTxnExecutions[txnID] = struct{}{}
}

t.mu.lastEmittedTime = newTime
Expand Down
26 changes: 21 additions & 5 deletions pkg/sql/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ func TestTelemetryLoggingTxnMode(t *testing.T) {
queries := []testQuery{
{
query: "BEGIN; TRUNCATE t; COMMIT",
logTime: 0, // Logged.
logTime: 0.1, // Logged.
},
{
query: "BEGIN; SELECT 1; SELECT 2; SELECT 3; COMMIT",
Expand Down Expand Up @@ -1749,17 +1749,25 @@ func TestTelemetryLoggingTxnMode(t *testing.T) {
}

expectedLogs := []expectedLog{
{
logMsg: `BEGIN TRANSACTION`,
skippedQueryCount: 0,
},
{
logMsg: `TRUNCATE TABLE defaultdb.public.t`,
skippedQueryCount: 1, // Skipped BEGIN.
skippedQueryCount: 0,
},
{
logMsg: `COMMIT TRANSACTION`,
skippedQueryCount: 0,
},
{
logMsg: `BEGIN TRANSACTION`,
skippedQueryCount: 0,
},
{
logMsg: `SELECT ‹1›`,
skippedQueryCount: 1, // BEGIN skipped.
skippedQueryCount: 0,
},
{
logMsg: `SELECT ‹2›`,
Expand All @@ -1781,9 +1789,13 @@ func TestTelemetryLoggingTxnMode(t *testing.T) {
logMsg: `SELECT * FROM ""."".t LIMIT ‹2›`,
skippedQueryCount: 2,
},
{
logMsg: `BEGIN TRANSACTION`,
skippedQueryCount: 3,
},
{
logMsg: `SELECT * FROM ""."".t LIMIT ‹4›`,
skippedQueryCount: 4,
skippedQueryCount: 0,
},
{
logMsg: `SELECT * FROM ""."".t LIMIT ‹5›`,
Expand Down Expand Up @@ -2059,9 +2071,13 @@ func TestTelemetryLoggingRespectsTxnLimit(t *testing.T) {
}

expectedLogs := []expectedLog{
{
logMsg: `BEGIN TRANSACTION`,
skippedQueryCount: 0,
},
{
logMsg: `SELECT ‹1›`,
skippedQueryCount: 2, // Skipped both BEGIN queries.
skippedQueryCount: 1, // Skipped BEGIN in other transaction.
},
{
logMsg: `SELECT ‹1›, ‹1›`,
Expand Down

0 comments on commit a941c03

Please sign in to comment.