Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: introduce Transaction ID Cache #74115

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<tr><td><code>server.web_session.purge.period</code></td><td>duration</td><td><code>1h0m0s</code></td><td>the time until old sessions are deleted</td></tr>
<tr><td><code>server.web_session.purge.ttl</code></td><td>duration</td><td><code>1h0m0s</code></td><td>if nonzero, entries in system.web_sessions older than this duration are periodically purged</td></tr>
<tr><td><code>server.web_session_timeout</code></td><td>duration</td><td><code>168h0m0s</code></td><td>the duration that a newly created web session will be valid</td></tr>
<tr><td><code>sql.contention.txn_id_cache.max_size</code></td><td>byte size</td><td><code>64 MiB</code></td><td>the maximum byte size TxnID cache will use</td></tr>
<tr><td><code>sql.cross_db_fks.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating foreign key references across databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_owners.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, creating sequences owned by tables from other databases is allowed</td></tr>
<tr><td><code>sql.cross_db_sequence_references.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if true, sequences referenced by tables from other databases are allowed</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
"//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
Expand Down
3 changes: 3 additions & 0 deletions pkg/roachpb/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func ConstructStatementFingerprintID(
// individual statement fingerprint IDs that comprise the transaction.
type TransactionFingerprintID uint64

// InvalidTransactionFingerprintID denotes an invalid transaction fingerprint ID.
const InvalidTransactionFingerprintID = TransactionFingerprintID(0)

// Size returns the size of the TransactionFingerprintID.
func (t TransactionFingerprintID) Size() int64 {
return 8
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ go_library(
"zone_config.go",
":gen-advancecode-stringer", # keep
":gen-nodestatus-stringer", # keep
":gen-txnevent-stringer", # keep
":gen-txneventtype-stringer", # keep
":gen-txntype-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql",
Expand Down Expand Up @@ -311,6 +311,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colflow",
"//pkg/sql/contention",
"//pkg/sql/contention/txnidcache",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/descmetadata",
Expand Down Expand Up @@ -701,9 +702,9 @@ go_test(
)

stringer(
name = "gen-txnevent-stringer",
name = "gen-txneventtype-stringer",
src = "txn_state.go",
typ = "txnEvent",
typ = "txnEventType",
)

stringer(
Expand Down
47 changes: 37 additions & 10 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -279,6 +280,10 @@ type Server struct {
// node as gateway node.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCache stores the mapping from transaction ID to transaction
// fingerprint IDs for all recently executed transactions.
txnIDCache *txnidcache.Cache

// Metrics is used to account normal queries.
Metrics Metrics

Expand Down Expand Up @@ -319,6 +324,10 @@ type Metrics struct {
type ServerMetrics struct {
// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics

// ContentionSubsystemMetrics contains metrics related to contention
// subsystem.
ContentionSubsystemMetrics txnidcache.Metrics
}

// NewServer creates a new Server. Start() needs to be called before the Server
Expand Down Expand Up @@ -361,6 +370,9 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
ChannelSize: idxusage.DefaultChannelSize,
Setting: cfg.Settings,
}),
txnIDCache: txnidcache.NewTxnIDCache(
cfg.Settings,
&serverMetrics.ContentionSubsystemMetrics),
}

telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
Expand Down Expand Up @@ -451,6 +463,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval,
),
},
ContentionSubsystemMetrics: txnidcache.NewMetrics(),
}
}

Expand All @@ -462,6 +475,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// accumulated in the reporter when the telemetry server fails.
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)

s.txnIDCache.Start(ctx, stopper)
}

// GetSQLStatsController returns the persistedsqlstats.Controller for current
Expand All @@ -487,6 +502,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
return s.reportedStatsController
}

// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server.
func (s *Server) GetTxnIDCache() *txnidcache.Cache {
return s.txnIDCache
}

// GetScrubbedStmtStats returns the statement statistics by app, with the
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
Expand Down Expand Up @@ -814,6 +834,7 @@ func (s *Server) newConnExecutor(
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
txnIDCacheWriter: s.txnIDCache,
}

ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
Expand Down Expand Up @@ -985,9 +1006,9 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")

txnEv := noEvent
txnEvType := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
txnEv = txnRollback
txnEvType = txnRollback
}

if closeType == normalClose {
Expand Down Expand Up @@ -1019,7 +1040,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}

if err := ex.resetExtraTxnState(ctx, txnEv); err != nil {
if err := ex.resetExtraTxnState(ctx, txnEvent{eventType: txnEvType}); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}

Expand Down Expand Up @@ -1401,6 +1422,10 @@ type connExecutor struct {

// indexUsageStats is used to track index usage stats.
indexUsageStats *idxusage.LocalIndexUsageStats

// txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the
// Transaction ID Cache.
txnIDCacheWriter txnidcache.Writer
}

// ctxHolder contains a connection's context and, while session tracing is
Expand Down Expand Up @@ -1516,7 +1541,9 @@ func (ns *prepStmtNamespace) resetTo(
}

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
// commits, rolls back or restarts.
// finishes execution (either commits, rollbacks or restarts). Based on the
// transaction event, resetExtraTxnState invokes corresponding callbacks
// (e.g. onTxnFinish() and onTxnRestart()).
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error {
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
Expand All @@ -1536,7 +1563,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

switch ev {
switch ev.eventType {
case txnCommit, txnRollback:
for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
Expand Down Expand Up @@ -2046,7 +2073,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
if _, ok := ex.machine.CurState().(stateOpen); !ok {
return nil
}
if advInfo.txnEvent == txnStart || advInfo.txnEvent == txnRestart {
if advInfo.txnEvent.eventType == txnStart || advInfo.txnEvent.eventType == txnRestart {
var nextPos CmdPos
switch advInfo.code {
case stayInPlace:
Expand Down Expand Up @@ -2228,7 +2255,7 @@ func (ex *connExecutor) execCopyIn(
} else {
txnOpt = copyTxnOpt{
resetExtraTxnState: func(ctx context.Context) error {
return ex.resetExtraTxnState(ctx, noEvent)
return ex.resetExtraTxnState(ctx, txnEvent{eventType: noEvent})
},
}
}
Expand Down Expand Up @@ -2680,7 +2707,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

// Handle transaction events which cause updates to txnState.
switch advInfo.txnEvent {
switch advInfo.txnEvent.eventType {
case noEvent:
_, nextStateIsAborted := ex.machine.CurState().(stateAborted)
// Update the deadline on the transaction based on the collections,
Expand All @@ -2696,7 +2723,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
case txnStart:
ex.extraTxnState.autoRetryCounter = 0
ex.extraTxnState.autoRetryReason = nil
ex.recordTransactionStart()
ex.recordTransactionStart(advInfo.txnEvent.txnID)
// Bump the txn counter for logging.
ex.extraTxnState.txnCounter++
if !ex.server.cfg.Codec.ForSystemTenant() {
Expand All @@ -2719,7 +2746,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
err := errorutil.UnexpectedWithIssueErrorf(
26687,
"programming error: non-error event %s generated even though res.Err() has been set to: %s",
errors.Safe(advInfo.txnEvent.String()),
errors.Safe(advInfo.txnEvent.eventType.String()),
res.Err())
log.Errorf(ex.Ctx(), "%v", err)
errorutil.SendReport(ex.Ctx(), &ex.server.cfg.Settings.SV, err)
Expand Down
107 changes: 64 additions & 43 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
Expand All @@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -1880,46 +1882,6 @@ func payloadHasError(payload fsm.EventPayload) bool {
return hasErr
}

// recordTransactionStart records the start of the transaction.
func (ex *connExecutor) recordTransactionStart() {
ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
ex.state.mu.RUnlock()
implicit := ex.implicitTxn()

// Transaction received time is the time at which the statement that prompted
// the creation of this transaction was received.
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived,
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived))
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now())
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction,
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction))
ex.extraTxnState.transactionStatementsHash = util.MakeFNV64()
ex.extraTxnState.transactionStatementFingerprintIDs = nil
ex.extraTxnState.numRows = 0
ex.extraTxnState.shouldCollectTxnExecutionStats = false
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
ex.extraTxnState.rowsWrittenLogged = false
ex.extraTxnState.rowsReadLogged = false
if txnExecStatsSampleRate := collectTxnStatsSampleRate.Get(&ex.server.GetExecutorConfig().Settings.SV); txnExecStatsSampleRate > 0 {
ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64()
}

ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1)

ex.extraTxnState.shouldExecuteOnTxnFinish = true
ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart
ex.extraTxnState.txnFinishClosure.implicit = implicit
ex.extraTxnState.shouldExecuteOnTxnRestart = true

if !implicit {
ex.statsCollector.StartExplicitTransaction()
}
}

func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
if ex.extraTxnState.shouldExecuteOnTxnFinish {
ex.extraTxnState.shouldExecuteOnTxnFinish = false
Expand All @@ -1934,7 +1896,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart)
if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
ex.sessionData(),
ev.txnID,
transactionFingerprintID,
)
}
err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
log.Warningf(ctx, "failed to record transaction stats: %s", err)
Expand Down Expand Up @@ -1963,7 +1932,54 @@ func (ex *connExecutor) onTxnRestart() {
}
}

func (ex *connExecutor) recordTransaction(
// recordTransactionStart records the start of the transaction.
func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
// Transaction fingerprint ID will be available once transaction finishes
// execution.
ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
TxnID: txnID,
TxnFingerprintID: roachpb.InvalidTransactionFingerprintID,
})

ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
ex.state.mu.RUnlock()
implicit := ex.implicitTxn()

// Transaction received time is the time at which the statement that prompted
// the creation of this transaction was received.
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived,
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived))
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now())
ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction,
ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction))
ex.extraTxnState.transactionStatementsHash = util.MakeFNV64()
ex.extraTxnState.transactionStatementFingerprintIDs = nil
ex.extraTxnState.numRows = 0
ex.extraTxnState.shouldCollectTxnExecutionStats = false
ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{}
ex.extraTxnState.rowsRead = 0
ex.extraTxnState.bytesRead = 0
ex.extraTxnState.rowsWritten = 0
ex.extraTxnState.rowsWrittenLogged = false
ex.extraTxnState.rowsReadLogged = false
if txnExecStatsSampleRate := collectTxnStatsSampleRate.Get(&ex.server.GetExecutorConfig().Settings.SV); txnExecStatsSampleRate > 0 {
ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64()
}

ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1)

ex.extraTxnState.shouldExecuteOnTxnFinish = true
ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart
ex.extraTxnState.txnFinishClosure.implicit = implicit
ex.extraTxnState.shouldExecuteOnTxnRestart = true

if !implicit {
ex.statsCollector.StartExplicitTransaction()
}
}

func (ex *connExecutor) recordTransactionFinish(
ctx context.Context,
transactionFingerprintID roachpb.TransactionFingerprintID,
ev txnEvent,
Expand Down Expand Up @@ -1991,13 +2007,18 @@ func (ex *connExecutor) recordTransaction(
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())

ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
TxnID: ev.txnID,
TxnFingerprintID: transactionFingerprintID,
})

txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency()
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
commitLat := ex.phaseTimes.GetCommitLatency()

recordedTxnStats := sqlstats.RecordedTxnStats{
TransactionTimeSec: txnTime.Seconds(),
Committed: ev == txnCommit,
Committed: ev.eventType == txnCommit,
ImplicitTxn: implicit,
RetryCount: int64(ex.extraTxnState.autoRetryCounter),
StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs,
Expand Down
Loading