diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 115a3027bc5b..409b4d010477 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -81,6 +81,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o
server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted
server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged
server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid
+sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use
sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed
sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed
sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 3ca78510e519..f63361501995 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -88,6 +88,7 @@
server.web_session.purge.period | duration | 1h0m0s | the time until old sessions are deleted |
server.web_session.purge.ttl | duration | 1h0m0s | if nonzero, entries in system.web_sessions older than this duration are periodically purged |
server.web_session_timeout | duration | 168h0m0s | the duration that a newly created web session will be valid |
+sql.contention.txn_id_cache.max_size | byte size | 64 MiB | the maximum byte size TxnID cache will use |
sql.cross_db_fks.enabled | boolean | false | if true, creating foreign key references across databases is allowed |
sql.cross_db_sequence_owners.enabled | boolean | false | if true, creating sequences owned by tables from other databases is allowed |
sql.cross_db_sequence_references.enabled | boolean | false | if true, sequences referenced by tables from other databases are allowed |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index 0725b8bcc394..eed4b0827394 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -238,6 +238,7 @@ ALL_TESTS = [
"//pkg/sql/colflow/colrpc:colrpc_test",
"//pkg/sql/colflow:colflow_test",
"//pkg/sql/colmem:colmem_test",
+ "//pkg/sql/contention/txnidcache:txnidcache_test",
"//pkg/sql/contention:contention_test",
"//pkg/sql/covering:covering_test",
"//pkg/sql/distsql:distsql_test",
diff --git a/pkg/roachpb/app_stats.go b/pkg/roachpb/app_stats.go
index 2722c831ae44..e5c6f02187b1 100644
--- a/pkg/roachpb/app_stats.go
+++ b/pkg/roachpb/app_stats.go
@@ -60,6 +60,9 @@ func ConstructStatementFingerprintID(
// individual statement fingerprint IDs that comprise the transaction.
type TransactionFingerprintID uint64
+// InvalidTransactionFingerprintID denotes an invalid transaction fingerprint ID.
+const InvalidTransactionFingerprintID = TransactionFingerprintID(0)
+
// Size returns the size of the TransactionFingerprintID.
func (t TransactionFingerprintID) Size() int64 {
return 8
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index 1aa07cb9d66d..cfe7be587e88 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -238,7 +238,7 @@ go_library(
"zone_config.go",
":gen-advancecode-stringer", # keep
":gen-nodestatus-stringer", # keep
- ":gen-txnevent-stringer", # keep
+ ":gen-txneventtype-stringer", # keep
":gen-txntype-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql",
@@ -311,6 +311,7 @@ go_library(
"//pkg/sql/colexec",
"//pkg/sql/colflow",
"//pkg/sql/contention",
+ "//pkg/sql/contention/txnidcache",
"//pkg/sql/covering",
"//pkg/sql/delegate",
"//pkg/sql/descmetadata",
@@ -701,9 +702,9 @@ go_test(
)
stringer(
- name = "gen-txnevent-stringer",
+ name = "gen-txneventtype-stringer",
src = "txn_state.go",
- typ = "txnEvent",
+ typ = "txnEventType",
)
stringer(
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index f0055419269d..cc7dd699f1ae 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
+ "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/idxusage"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
@@ -279,6 +280,10 @@ type Server struct {
// node as gateway node.
indexUsageStats *idxusage.LocalIndexUsageStats
+ // txnIDCache stores the mapping from transaction ID to transaction
+ // fingerprint IDs for all recently executed transactions.
+ txnIDCache *txnidcache.Cache
+
// Metrics is used to account normal queries.
Metrics Metrics
@@ -319,6 +324,10 @@ type Metrics struct {
type ServerMetrics struct {
// StatsMetrics contains metrics for SQL statistics collection.
StatsMetrics StatsMetrics
+
+ // ContentionSubsystemMetrics contains metrics related to contention
+ // subsystem.
+ ContentionSubsystemMetrics txnidcache.Metrics
}
// NewServer creates a new Server. Start() needs to be called before the Server
@@ -361,6 +370,9 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
ChannelSize: idxusage.DefaultChannelSize,
Setting: cfg.Settings,
}),
+ txnIDCache: txnidcache.NewTxnIDCache(
+ cfg.Settings,
+ &serverMetrics.ContentionSubsystemMetrics),
}
telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
@@ -451,6 +463,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics {
MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval,
),
},
+ ContentionSubsystemMetrics: txnidcache.NewMetrics(),
}
}
@@ -462,6 +475,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) {
// accumulated in the reporter when the telemetry server fails.
// Usually it is telemetry's reporter's job to clear the reporting SQL Stats.
s.reportedStats.Start(ctx, stopper)
+
+ s.txnIDCache.Start(ctx, stopper)
}
// GetSQLStatsController returns the persistedsqlstats.Controller for current
@@ -487,6 +502,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller {
return s.reportedStatsController
}
+// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server.
+func (s *Server) GetTxnIDCache() *txnidcache.Cache {
+ return s.txnIDCache
+}
+
// GetScrubbedStmtStats returns the statement statistics by app, with the
// queries scrubbed of their identifiers. Any statements which cannot be
// scrubbed will be omitted from the returned map.
@@ -814,6 +834,7 @@ func (s *Server) newConnExecutor(
hasCreatedTemporarySchema: false,
stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder,
indexUsageStats: s.indexUsageStats,
+ txnIDCacheWriter: s.txnIDCache,
}
ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount
@@ -985,9 +1006,9 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})
func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.sessionEventf(ctx, "finishing connExecutor")
- txnEv := noEvent
+ txnEvType := noEvent
if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn {
- txnEv = txnRollback
+ txnEvType = txnRollback
}
if closeType == normalClose {
@@ -1019,7 +1040,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
ex.state.finishExternalTxn()
}
- if err := ex.resetExtraTxnState(ctx, txnEv); err != nil {
+ if err := ex.resetExtraTxnState(ctx, txnEvent{eventTyp: txnEvType}); err != nil {
log.Warningf(ctx, "error while cleaning up connExecutor: %s", err)
}
@@ -1401,6 +1422,10 @@ type connExecutor struct {
// indexUsageStats is used to track index usage stats.
indexUsageStats *idxusage.LocalIndexUsageStats
+
+ // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the
+ // Transaction ID Cache.
+ txnIDCacheWriter txnidcache.Writer
}
// ctxHolder contains a connection's context and, while session tracing is
@@ -1516,7 +1541,9 @@ func (ns *prepStmtNamespace) resetTo(
}
// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
-// commits, rolls back or restarts.
+// finishes execution (either commits, rollbacks or restarts). Based on the
+// transaction event, resetExtraTxnState invokes corresponding callbacks
+// (e.g. onTxnFinish() and onTxnRestart()).
func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error {
ex.extraTxnState.jobs = nil
ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}
@@ -1536,7 +1563,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}
- switch ev {
+ switch ev.eventTyp {
case txnCommit, txnRollback:
for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
@@ -2046,7 +2073,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
if _, ok := ex.machine.CurState().(stateOpen); !ok {
return nil
}
- if advInfo.txnEvent == txnStart || advInfo.txnEvent == txnRestart {
+ if advInfo.txnEvent.eventTyp == txnStart || advInfo.txnEvent.eventTyp == txnRestart {
var nextPos CmdPos
switch advInfo.code {
case stayInPlace:
@@ -2228,7 +2255,7 @@ func (ex *connExecutor) execCopyIn(
} else {
txnOpt = copyTxnOpt{
resetExtraTxnState: func(ctx context.Context) error {
- return ex.resetExtraTxnState(ctx, noEvent)
+ return ex.resetExtraTxnState(ctx, txnEvent{eventTyp: noEvent})
},
}
}
@@ -2680,7 +2707,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
// Handle transaction events which cause updates to txnState.
- switch advInfo.txnEvent {
+ switch advInfo.txnEvent.eventTyp {
case noEvent:
_, nextStateIsAborted := ex.machine.CurState().(stateAborted)
// Update the deadline on the transaction based on the collections,
@@ -2696,7 +2723,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
case txnStart:
ex.extraTxnState.autoRetryCounter = 0
ex.extraTxnState.autoRetryReason = nil
- ex.recordTransactionStart()
+ ex.recordTransactionStart(advInfo.txnEvent.txnID)
// Bump the txn counter for logging.
ex.extraTxnState.txnCounter++
if !ex.server.cfg.Codec.ForSystemTenant() {
@@ -2719,7 +2746,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
err := errorutil.UnexpectedWithIssueErrorf(
26687,
"programming error: non-error event %s generated even though res.Err() has been set to: %s",
- errors.Safe(advInfo.txnEvent.String()),
+ errors.Safe(advInfo.txnEvent.eventTyp.String()),
res.Err())
log.Errorf(ex.Ctx(), "%v", err)
errorutil.SendReport(ex.Ctx(), &ex.server.cfg.Settings.SV, err)
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index be3f4bde3ae2..54821c841bd6 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
+ "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/execstats"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
@@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
@@ -1881,7 +1883,14 @@ func payloadHasError(payload fsm.EventPayload) bool {
}
// recordTransactionStart records the start of the transaction.
-func (ex *connExecutor) recordTransactionStart() {
+func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) {
+ // Transaction fingerprint ID will be available once transaction finishes
+ // execution.
+ ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
+ TxnID: txnID,
+ TxnFingerprintID: roachpb.InvalidTransactionFingerprintID,
+ })
+
ex.state.mu.RLock()
txnStart := ex.state.mu.txnStart
ex.state.mu.RUnlock()
@@ -1934,6 +1943,13 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) {
transactionFingerprintID,
)
}
+ if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil {
+ ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded(
+ ex.sessionData(),
+ ev.txnID,
+ transactionFingerprintID,
+ )
+ }
err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart)
if err != nil {
if log.V(1) {
@@ -1991,13 +2007,18 @@ func (ex *connExecutor) recordTransaction(
ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1)
ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds())
+ ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{
+ TxnID: ev.txnID,
+ TxnFingerprintID: transactionFingerprintID,
+ })
+
txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency()
txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency()
commitLat := ex.phaseTimes.GetCommitLatency()
recordedTxnStats := sqlstats.RecordedTxnStats{
TransactionTimeSec: txnTime.Seconds(),
- Committed: ev == txnCommit,
+ Committed: ev.eventTyp == txnCommit,
ImplicitTxn: implicit,
RetryCount: int64(ex.extraTxnState.autoRetryCounter),
StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs,
diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go
index b9f44c86e8e7..b2d31a46799a 100644
--- a/pkg/sql/conn_fsm.go
+++ b/pkg/sql/conn_fsm.go
@@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlfsm"
"github.com/cockroachdb/cockroach/pkg/util/fsm"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
)
// Constants for the String() representation of the session states. Shared with
@@ -199,6 +200,10 @@ func (eventRetriableErr) Event() {}
func (eventTxnRestart) Event() {}
func (eventTxnReleased) Event() {}
+// Other constants.
+
+var emptyTxnID = uuid.UUID{}
+
// TxnStateTransitions describe the transitions used by a connExecutor's
// fsm.Machine. Args.Extended is a txnState, which is muted by the Actions.
//
@@ -231,7 +236,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Next: stateNoTxn{},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
- ts.setAdvanceInfo(skipBatch, noRewind, noEvent)
+ ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventTyp: noEvent})
return nil
},
},
@@ -284,7 +289,8 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
args.Extended.(*txnState).setAdvanceInfo(
rewind,
args.Payload.(eventRetriableErrPayload).rewCap,
- txnRestart)
+ txnEvent{eventTyp: txnRestart},
+ )
return nil
},
},
@@ -306,7 +312,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Next: stateAborted{},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
- ts.setAdvanceInfo(skipBatch, noRewind, noEvent)
+ ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventTyp: noEvent})
return nil
},
},
@@ -316,7 +322,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
Next: stateOpen{ImplicitTxn: fsm.False},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart)
+ args.Extended.(*txnState).setAdvanceInfo(
+ advanceOne,
+ noRewind,
+ txnEvent{eventTyp: txnRestart},
+ )
return nil
},
},
@@ -325,7 +335,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Action: func(args fsm.Args) error {
// Note: Preparing the KV txn for restart has already happened by this
// point.
- args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent)
+ args.Extended.(*txnState).setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: noEvent},
+ )
return nil
},
},
@@ -333,7 +347,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "RELEASE SAVEPOINT cockroach_restart",
Next: stateCommitWait{},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnCommit)
+ ts := args.Extended.(*txnState)
+ ts.mu.Lock()
+ txnID := ts.mu.txn.ID()
+ ts.mu.Unlock()
+ ts.setAdvanceInfo(
+ advanceOne,
+ noRewind,
+ txnEvent{eventTyp: txnCommit, txnID: txnID},
+ )
return nil
},
},
@@ -361,7 +383,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "any other statement",
Next: stateAborted{},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent)
+ args.Extended.(*txnState).setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: noEvent},
+ )
return nil
},
},
@@ -377,7 +403,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success",
Next: stateOpen{ImplicitTxn: fsm.False},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, noEvent)
+ args.Extended.(*txnState).setAdvanceInfo(
+ advanceOne,
+ noRewind,
+ txnEvent{eventTyp: noEvent},
+ )
return nil
},
},
@@ -387,7 +417,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart",
Next: stateAborted{},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent)
+ args.Extended.(*txnState).setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: noEvent},
+ )
return nil
},
},
@@ -396,7 +430,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "ROLLBACK TO SAVEPOINT cockroach_restart",
Next: stateOpen{ImplicitTxn: fsm.False},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart)
+ args.Extended.(*txnState).setAdvanceInfo(
+ advanceOne,
+ noRewind,
+ txnEvent{eventTyp: txnRestart},
+ )
return nil
},
},
@@ -421,7 +459,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
Description: "any other statement",
Next: stateCommitWait{},
Action: func(args fsm.Args) error {
- args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent)
+ args.Extended.(*txnState).setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: noEvent},
+ )
return nil
},
},
@@ -445,7 +487,7 @@ func noTxnToOpen(args fsm.Args) error {
advCode = stayInPlace
}
- ts.resetForNewSQLTxn(
+ newTxnID := ts.resetForNewSQLTxn(
connCtx,
txnTyp,
payload.txnSQLTimestamp,
@@ -455,15 +497,19 @@ func noTxnToOpen(args fsm.Args) error {
nil, /* txn */
payload.tranCtx,
)
- ts.setAdvanceInfo(advCode, noRewind, txnStart)
+ ts.setAdvanceInfo(
+ advCode,
+ noRewind,
+ txnEvent{eventTyp: txnStart, txnID: newTxnID},
+ )
return nil
}
// finishTxn finishes the transaction. It also calls setAdvanceInfo() with the
// given event.
-func (ts *txnState) finishTxn(ev txnEvent) error {
- ts.finishSQLTxn()
- ts.setAdvanceInfo(advanceOne, noRewind, ev)
+func (ts *txnState) finishTxn(ev txnEventType) error {
+ finishedTxnID := ts.finishSQLTxn()
+ ts.setAdvanceInfo(advanceOne, noRewind, txnEvent{eventTyp: ev, txnID: finishedTxnID})
return nil
}
@@ -471,8 +517,12 @@ func (ts *txnState) finishTxn(ev txnEvent) error {
func cleanupAndFinishOnError(args fsm.Args) error {
ts := args.Extended.(*txnState)
ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause())
- ts.finishSQLTxn()
- ts.setAdvanceInfo(skipBatch, noRewind, txnRollback)
+ finishedTxnID := ts.finishSQLTxn()
+ ts.setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: txnRollback, txnID: finishedTxnID},
+ )
return nil
}
@@ -488,8 +538,12 @@ var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{
Next: stateInternalError{},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
- ts.finishSQLTxn()
- ts.setAdvanceInfo(skipBatch, noRewind, txnRollback)
+ finishedTxnID := ts.finishSQLTxn()
+ ts.setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: txnRollback, txnID: finishedTxnID},
+ )
return nil
},
},
@@ -497,8 +551,12 @@ var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{
Next: stateInternalError{},
Action: func(args fsm.Args) error {
ts := args.Extended.(*txnState)
- ts.finishSQLTxn()
- ts.setAdvanceInfo(skipBatch, noRewind, txnRollback)
+ finishedTxnID := ts.finishSQLTxn()
+ ts.setAdvanceInfo(
+ skipBatch,
+ noRewind,
+ txnEvent{eventTyp: txnRollback, txnID: finishedTxnID},
+ )
return nil
},
},
diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel
new file mode 100644
index 000000000000..87685dab0f76
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/BUILD.bazel
@@ -0,0 +1,53 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "txnidcache",
+ srcs = [
+ "cluster_settings.go",
+ "concurrent_write_buffer.go",
+ "metrics.go",
+ "txn_id_cache.go",
+ "writer.go",
+ ],
+ importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/roachpb:with-mocks",
+ "//pkg/settings",
+ "//pkg/settings/cluster",
+ "//pkg/util/cache",
+ "//pkg/util/encoding",
+ "//pkg/util/metric",
+ "//pkg/util/stop",
+ "//pkg/util/syncutil",
+ "//pkg/util/uuid",
+ ],
+)
+
+go_test(
+ name = "txnidcache_test",
+ srcs = [
+ "main_test.go",
+ "txn_id_cache_test.go",
+ ],
+ deps = [
+ "//pkg/kv",
+ "//pkg/roachpb:with-mocks",
+ "//pkg/security",
+ "//pkg/security/securitytest",
+ "//pkg/server",
+ "//pkg/sql",
+ "//pkg/sql/sessiondata",
+ "//pkg/sql/tests",
+ "//pkg/testutils",
+ "//pkg/testutils/serverutils",
+ "//pkg/testutils/sqlutils",
+ "//pkg/testutils/testcluster",
+ "//pkg/util/leaktest",
+ "//pkg/util/log",
+ "//pkg/util/syncutil",
+ "//pkg/util/uuid",
+ "@com_github_cockroachdb_errors//:errors",
+ "@com_github_stretchr_testify//require",
+ ],
+)
diff --git a/pkg/sql/contention/txnidcache/cluster_settings.go b/pkg/sql/contention/txnidcache/cluster_settings.go
new file mode 100644
index 000000000000..c2fbf3c47474
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/cluster_settings.go
@@ -0,0 +1,21 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import "github.com/cockroachdb/cockroach/pkg/settings"
+
+// MaxSize limits the maximum byte size can be used by the TxnIDCache.
+var MaxSize = settings.RegisterByteSizeSetting(
+ settings.TenantWritable,
+ `sql.contention.txn_id_cache.max_size`,
+ "the maximum byte size TxnID cache will use",
+ 64*1024*1024, // 64 MB
+).WithPublic()
diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go
new file mode 100644
index 000000000000..2827afd1eb8a
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go
@@ -0,0 +1,118 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+)
+
+const messageBlockSize = 1024
+
+type messageBlock [messageBlockSize]ResolvedTxnID
+
+// concurrentWriteBuffer is a data structure that optimizes for concurrent
+// writes and also implements the Writer interface.
+//
+// Any write requests initially starts by holding a read lock (flushSyncLock)
+// and then reserves an index to the messageBlock (a fixed-length array). If the
+// reserved index is valid, concurrentWriteBuffer immediately writes to the
+// array at the reserved index. However, if the reserved index is not valid,
+// (that is, array index out of bound), there are two scenarios:
+// 1. If the reserved index == size of the array, then the caller of Record()
+// method is responsible for flushing the entire array into the channel. The
+// caller does so by upgrading the read-lock to a write-lock, therefore
+// blocks all future writers from writing to the shared array. After the
+// flush is performed, the write-lock is then downgraded to a read-lock.
+// 2. If the reserved index > size of the array, then the caller of Record()
+// is blocked until the array is flushed. This is achieved by waiting on the
+// conditional variable (flushDone) while holding onto the read-lock. After
+// the flush is completed, the writer is unblocked and allowed to retry.
+type concurrentWriteBuffer struct {
+ flushSyncLock syncutil.RWMutex
+ flushDone sync.Cond
+
+ msgBlockPool *sync.Pool
+
+ // msgBlock is the temporary buffer that concurrentWriteBuffer uses to batch
+ // write requests before sending them into the channel.
+ msgBlock *messageBlock
+
+ // atomicIdx is the index pointing into the fixed-length array within the
+ // msgBlock.This should only be accessed using atomic package.
+ atomicIdx int64
+
+ // sink is the flush target that concurrentWriteBuffer flushes to once
+ // msgBlock is full.
+ sink messageSink
+}
+
+var _ Writer = &concurrentWriteBuffer{}
+
+// newConcurrentWriteBuffer returns a new instance of concurrentWriteBuffer.
+func newConcurrentWriteBuffer(sink messageSink, msgBlockPool *sync.Pool) *concurrentWriteBuffer {
+ writeBuffer := &concurrentWriteBuffer{
+ sink: sink,
+ msgBlockPool: msgBlockPool,
+ msgBlock: msgBlockPool.Get().(*messageBlock),
+ }
+ writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker()
+ return writeBuffer
+}
+
+// Record records a mapping from txnID to its corresponding transaction
+// fingerprint ID. Record is safe to be used concurrently.
+func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) {
+ c.flushSyncLock.RLock()
+ defer c.flushSyncLock.RUnlock()
+ for {
+ reservedIdx := c.reserveMsgBlockIndex()
+ if reservedIdx < messageBlockSize {
+ c.msgBlock[reservedIdx] = resolvedTxnID
+ return
+ } else if reservedIdx == messageBlockSize {
+ c.flushMsgBlockToChannelRLocked()
+ } else {
+ c.flushDone.Wait()
+ }
+ }
+}
+
+// Flush flushes concurrentWriteBuffer into the channel. It implements the
+// txnidcache.Writer interface.
+func (c *concurrentWriteBuffer) Flush() {
+ c.flushSyncLock.Lock()
+ c.flushMsgBlockToChannelLocked()
+ c.flushSyncLock.Unlock()
+}
+
+func (c *concurrentWriteBuffer) flushMsgBlockToChannelRLocked() {
+ // We upgrade the read-lock to a write-lock, then when we are done flushing,
+ // the lock is downgraded to a read-lock.
+ c.flushSyncLock.RUnlock()
+ defer c.flushSyncLock.RLock()
+ c.flushSyncLock.Lock()
+ defer c.flushSyncLock.Unlock()
+ c.flushMsgBlockToChannelLocked()
+}
+
+func (c *concurrentWriteBuffer) flushMsgBlockToChannelLocked() {
+ c.sink.push(c.msgBlock)
+ c.msgBlock = c.msgBlockPool.Get().(*messageBlock)
+ c.flushDone.Broadcast()
+ atomic.StoreInt64(&c.atomicIdx, 0)
+}
+
+func (c *concurrentWriteBuffer) reserveMsgBlockIndex() int64 {
+ return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed.
+}
diff --git a/pkg/sql/contention/txnidcache/main_test.go b/pkg/sql/contention/txnidcache/main_test.go
new file mode 100644
index 000000000000..327d352a4d3b
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/main_test.go
@@ -0,0 +1,29 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache_test
+
+import (
+ "os"
+ "testing"
+
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/security/securitytest"
+ "github.com/cockroachdb/cockroach/pkg/server"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+)
+
+func TestMain(m *testing.M) {
+ security.SetAssetLoader(securitytest.EmbeddedAssets)
+ serverutils.InitTestServerFactory(server.TestServerFactory)
+ serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
+ os.Exit(m.Run())
+}
diff --git a/pkg/sql/contention/txnidcache/metrics.go b/pkg/sql/contention/txnidcache/metrics.go
new file mode 100644
index 000000000000..35a6736b3cfc
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/metrics.go
@@ -0,0 +1,43 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import "github.com/cockroachdb/cockroach/pkg/util/metric"
+
+// Metrics is a structs that include all metrics related to contention
+// subsystem.
+type Metrics struct {
+ CacheMissCounter *metric.Counter
+ CacheReadCounter *metric.Counter
+}
+
+var _ metric.Struct = Metrics{}
+
+// MetricStruct implements the metric.Struct interface.
+func (Metrics) MetricStruct() {}
+
+// NewMetrics returns a new instance of Metrics.
+func NewMetrics() Metrics {
+ return Metrics{
+ CacheMissCounter: metric.NewCounter(metric.Metadata{
+ Name: "sql.contention.txn_id_cache.miss",
+ Help: "Number of cache misses",
+ Measurement: "Cache miss",
+ Unit: metric.Unit_COUNT,
+ }),
+ CacheReadCounter: metric.NewCounter(metric.Metadata{
+ Name: "sql.contention.txn_id_cache.read",
+ Help: "Number of cache read",
+ Measurement: "Cache read",
+ Unit: metric.Unit_COUNT,
+ }),
+ }
+}
diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go
new file mode 100644
index 000000000000..9384ed704430
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/txn_id_cache.go
@@ -0,0 +1,245 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "context"
+ "sync"
+
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
+ "github.com/cockroachdb/cockroach/pkg/util/cache"
+ "github.com/cockroachdb/cockroach/pkg/util/stop"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+)
+
+// Reader is the interface that can be used to query the transaction fingerprint
+// ID of a transaction ID.
+type Reader interface {
+ // Lookup returns the corresponding transaction fingerprint ID for a given txnID,
+ // if the given txnID has no entry in the Cache, the returned "found" boolean
+ // will be false.
+ Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool)
+}
+
+// Writer is the interface that can be used to write to txnidcache.
+type Writer interface {
+ // Record writes a pair of transactionID and transaction fingerprint ID
+ // into a temporary buffer. This buffer will eventually be flushed into
+ // the transaction ID cache asynchronously.
+ Record(resolvedTxnID ResolvedTxnID)
+
+ // Flush starts the flushing process of writer's temporary buffer.
+ Flush()
+}
+
+type messageSink interface {
+ // push allows a messageBlock to be pushed into the pusher.
+ push(*messageBlock)
+}
+
+const channelSize = 128
+
+// Cache stores the mapping from the Transaction IDs (UUID) of recently
+// executed transactions to their corresponding Transaction Fingerprint ID (uint64).
+// The size of Cache is controlled via sql.contention.txn_id_cache.max_size
+// cluster setting, and it follows FIFO eviction policy once the cache size
+// reaches the limit defined by the cluster setting.
+//
+// Cache's overall architecture is as follows:
+// +------------------------------------------------------------+
+// | connExecutor --------* |
+// | | writes resolvedTxnID to Writer |
+// | v |
+// | +---------------------------------------------------+ |
+// | | Writer | |
+// | | | |
+// | | Writer contains multiple shards of concurrent | |
+// | | write buffer. Each incoming resolvedTxnID is | |
+// | | first hashed to a corresponding shard, and then | |
+// | | is written to the concurrent write buffer | |
+// | | backing that shard. Once the concurrent write | |
+// | | buffer is full, a flush is performed and the | |
+// | | content of the buffer is send into the channel. | |
+// | | | |
+// | | +------------+ | |
+// | | | shard1 | | |
+// | | +------------+ | |
+// | | | shard2 | | |
+// | | +------------+ | |
+// | | | shard3 | | |
+// | | +------------+ | |
+// | | | ..... | | |
+// | | | ..... | | |
+// | | +------------+ | |
+// | | | shard128 | | |
+// | | +------------+ | |
+// | | | |
+// | +-----+---------------------------------------------+ |
+// +------------|-----------------------------------------------+
+// |
+// |
+// V
+// channel
+// ^
+// |
+// Cache polls the channel using a goroutine and push the
+// | messageBlock into its storage.
+// |
+// +----------------------------------+
+// | Cache: |
+// | The cache contains a |
+// | FIFO buffer backed by |
+// | cache.UnorderedCache |
+// +----------------------------------+
+type Cache struct {
+ st *cluster.Settings
+
+ msgChan chan *messageBlock
+ closeCh chan struct{}
+
+ mu struct {
+ syncutil.RWMutex
+
+ store *cache.UnorderedCache
+ }
+
+ messageBlockPool *sync.Pool
+
+ writer Writer
+
+ metrics *Metrics
+}
+
+var (
+ entrySize = int64(uuid.UUID{}.Size()) +
+ roachpb.TransactionFingerprintID(0).Size()
+)
+
+// ResolvedTxnID represents a TxnID that is resolved to its corresponding
+// TxnFingerprintID.
+type ResolvedTxnID struct {
+ TxnID uuid.UUID
+ TxnFingerprintID roachpb.TransactionFingerprintID
+}
+
+func (r *ResolvedTxnID) valid() bool {
+ return r.TxnID != uuid.UUID{}
+}
+
+var (
+ _ Reader = &Cache{}
+ _ Writer = &Cache{}
+ _ messageSink = &Cache{}
+)
+
+// NewTxnIDCache creates a new instance of Cache.
+func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache {
+ t := &Cache{
+ st: st,
+ metrics: metrics,
+ msgChan: make(chan *messageBlock, channelSize),
+ closeCh: make(chan struct{}),
+ }
+
+ t.messageBlockPool = &sync.Pool{
+ New: func() interface{} {
+ return &messageBlock{}
+ },
+ }
+
+ t.mu.store = cache.NewUnorderedCache(cache.Config{
+ Policy: cache.CacheFIFO,
+ ShouldEvict: func(size int, _, _ interface{}) bool {
+ return int64(size)*entrySize > MaxSize.Get(&st.SV)
+ },
+ })
+
+ t.writer = newWriter(t, t.messageBlockPool)
+ return t
+}
+
+// Start implements the Provider interface.
+func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) {
+ addBlockToStore := func(msgBlock *messageBlock) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ for blockIdx := range msgBlock {
+ if !msgBlock[blockIdx].valid() {
+ break
+ }
+ t.mu.store.Add(msgBlock[blockIdx].TxnID, msgBlock[blockIdx].TxnFingerprintID)
+ }
+ }
+
+ consumeBlock := func(b *messageBlock) {
+ addBlockToStore(b)
+ *b = messageBlock{}
+ t.messageBlockPool.Put(b)
+ }
+
+ err := stopper.RunAsyncTask(ctx, "txn-id-cache-ingest", func(ctx context.Context) {
+ for {
+ select {
+ case msgBlock := <-t.msgChan:
+ consumeBlock(msgBlock)
+ case <-stopper.ShouldQuiesce():
+ close(t.closeCh)
+ return
+ }
+ }
+ })
+ if err != nil {
+ close(t.closeCh)
+ }
+}
+
+// Lookup implements the Reader interface.
+func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) {
+ t.metrics.CacheReadCounter.Inc(1)
+
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+
+ txnFingerprintID, found := t.mu.store.Get(txnID)
+ if !found {
+ t.metrics.CacheMissCounter.Inc(1)
+ return roachpb.InvalidTransactionFingerprintID, found
+ }
+
+ return txnFingerprintID.(roachpb.TransactionFingerprintID), found
+}
+
+// Record implements the Writer interface.
+func (t *Cache) Record(resolvedTxnID ResolvedTxnID) {
+ t.writer.Record(resolvedTxnID)
+}
+
+// push implements the messageSink interface.
+func (t *Cache) push(msg *messageBlock) {
+ select {
+ case t.msgChan <- msg:
+ case <-t.closeCh:
+ }
+}
+
+// Flush flushes the resolved txn IDs in the Writer into the Cache.
+func (t *Cache) Flush() {
+ t.writer.Flush()
+}
+
+// Size return the current size of the Cache.
+func (t *Cache) Size() int64 {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ return int64(t.mu.store.Len()) * entrySize
+}
diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_test.go b/pkg/sql/contention/txnidcache/txn_id_cache_test.go
new file mode 100644
index 000000000000..4e70c0a5ee9f
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/txn_id_cache_test.go
@@ -0,0 +1,282 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache_test
+
+import (
+ "context"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/security"
+ "github.com/cockroachdb/cockroach/pkg/sql"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/sql/tests"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
+ "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+ "github.com/cockroachdb/errors"
+ "github.com/stretchr/testify/require"
+)
+
+// TestTransactionIDCache tests the correctness of the txnidcache.Cache.
+func TestTransactionIDCache(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ ctx := context.Background()
+ params, _ := tests.CreateTestServerParams()
+
+ appName := "txnIDCacheTest"
+ expectedTxnIDToUUIDMapping := make(map[uuid.UUID]roachpb.TransactionFingerprintID)
+ injector := runtimeHookInjector{}
+
+ injector.setHook(func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ ) {
+ if strings.Contains(sessionData.ApplicationName, appName) {
+ expectedTxnIDToUUIDMapping[txnID] = txnFingerprintID
+ }
+ })
+
+ params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
+ BeforeTxnStatsRecorded: injector.hook,
+ }
+
+ testServer, sqlConn, kvDB := serverutils.StartServer(t, params)
+ defer func() {
+ require.NoError(t, sqlConn.Close())
+ testServer.Stopper().Stop(ctx)
+ }()
+
+ testConn := sqlutils.MakeSQLRunner(sqlConn)
+
+ // Set the cache size limit to a very generous amount to prevent premature
+ // eviction.
+ testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '1GB'")
+
+ testConn.Exec(t, "CREATE DATABASE txnIDTest")
+ testConn.Exec(t, "USE txnIDTest")
+ testConn.Exec(t, "CREATE TABLE t AS SELECT generate_series(1, 10)")
+ testConn.Exec(t, "SET application_name = $1", appName)
+
+ testCases := []struct {
+ stmts []string
+ explicit bool
+ }{
+ // Implicit transactions that will have same statement fingerprint.
+ {
+ stmts: []string{"SELECT 1"},
+ explicit: false,
+ },
+ {
+ stmts: []string{"SELECT 2"},
+ explicit: false,
+ },
+
+ // Implicit transaction that have different statement fingerprints.
+ {
+ stmts: []string{"SELECT 1, 1"},
+ explicit: false,
+ },
+ {
+ stmts: []string{"SELECT 1, 1, 2"},
+ explicit: false,
+ },
+
+ // Explicit Transactions.
+ {
+ stmts: []string{"SELECT 1"},
+ explicit: true,
+ },
+ {
+ stmts: []string{"SELECT 5"},
+ explicit: true,
+ },
+ {
+ stmts: []string{"SELECT 5", "SELECT 6, 7"},
+ explicit: true,
+ },
+ }
+
+ // Send test statements into both regular SQL connection and internal
+ // executor to test both code paths.
+ for _, tc := range testCases {
+ if tc.explicit {
+ testConn.Exec(t, "BEGIN")
+ }
+ {
+ for _, stmt := range tc.stmts {
+ testConn.Exec(t, stmt)
+ }
+ }
+ if tc.explicit {
+ testConn.Exec(t, "COMMIT")
+ }
+ }
+
+ ie := testServer.InternalExecutor().(*sql.InternalExecutor)
+
+ for _, tc := range testCases {
+ // Send statements one by one since internal executor doesn't support
+ // sending a batch of statements.
+ for _, stmt := range tc.stmts {
+ var txn *kv.Txn
+ if tc.explicit {
+ txn = kvDB.NewTxn(ctx, "")
+ }
+ _, err := ie.QueryRowEx(
+ ctx,
+ appName,
+ txn,
+ sessiondata.InternalExecutorOverride{
+ User: security.RootUserName(),
+ },
+ stmt,
+ )
+ require.NoError(t, err)
+ if tc.explicit {
+ require.NoError(t, txn.Commit(ctx))
+ }
+
+ require.NoError(t, err)
+ }
+ }
+
+ // Ensure we have intercepted all transactions, the expected size is
+ // calculated as:
+ // # stmt executed in regular executor
+ // + # stmt executed in internal executor
+ // + 1 (`SET application_name` executed previously in regular SQL Conn)
+ // - 3 (explicit txns executed in internal executor due to
+ // https://github.com/cockroachdb/cockroach/issues/73091)
+ expectedTxnIDCacheSize := len(testCases)*2 + 1 - 3
+ require.Equal(t, expectedTxnIDCacheSize, len(expectedTxnIDToUUIDMapping))
+
+ sqlServer := testServer.SQLServer().(*sql.Server)
+ txnIDCache := sqlServer.GetTxnIDCache()
+
+ txnIDCache.Flush()
+ t.Run("resolved_txn_id_cache_record", func(t *testing.T) {
+ testutils.SucceedsWithin(t, func() error {
+ for txnID, expectedTxnFingerprintID := range expectedTxnIDToUUIDMapping {
+ actualTxnFingerprintID, ok := txnIDCache.Lookup(txnID)
+ if !ok {
+ return errors.Newf("expected to find txn(%s) with fingerprintID: "+
+ "%d, but it was not found.",
+ txnID, expectedTxnFingerprintID,
+ )
+ }
+ if expectedTxnFingerprintID != actualTxnFingerprintID {
+ return errors.Newf("expected to find txn(%s) with fingerprintID: %d, but the actual fingerprintID is: %d", txnID, expectedTxnFingerprintID, actualTxnFingerprintID)
+ }
+ }
+ return nil
+ }, 3*time.Second)
+
+ sizePreEviction := txnIDCache.Size()
+ testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '10B'")
+
+ // Execute additional queries to ensure we are overflowing the size limit.
+ testConn.Exec(t, "SELECT 1")
+ txnIDCache.Flush()
+
+ testutils.SucceedsWithin(t, func() error {
+ sizePostEviction := txnIDCache.Size()
+ if sizePostEviction >= sizePreEviction {
+ return errors.Newf("expected txn id cache size to shrink below %d, "+
+ "but it has increased to %d", sizePreEviction, sizePostEviction)
+ }
+ return nil
+ }, 3*time.Second)
+ })
+
+ t.Run("provisional_txn_id_cache_record", func(t *testing.T) {
+ testConn.Exec(t, "RESET CLUSTER SETTING sql.contention.txn_id_cache.max_size")
+ callCaptured := uint32(0)
+
+ injector.setHook(func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID) {
+ if strings.Contains(sessionData.ApplicationName, appName) {
+ if txnFingerprintID != roachpb.InvalidTransactionFingerprintID {
+ txnIDCache.Flush()
+
+ testutils.SucceedsWithin(t, func() error {
+ existingTxnFingerprintID, ok := txnIDCache.Lookup(txnID)
+ if !ok {
+ return errors.Newf("expected provision txn fingerprint id to be found for "+
+ "txn(%s), but it was not", txnID)
+ }
+ if existingTxnFingerprintID != roachpb.InvalidTransactionFingerprintID {
+ return errors.Newf("expected txn (%s) to have a provisional"+
+ "txn fingerprint id, but this txn already has a resolved "+
+ "txn fingerprint id: %d", txnID, existingTxnFingerprintID)
+ }
+ return nil
+ }, 3*time.Second)
+ atomic.StoreUint32(&callCaptured, 1)
+ }
+ }
+ })
+
+ testConn.Exec(t, "BEGIN")
+ testConn.Exec(t, "SELECT 1")
+ testConn.Exec(t, "COMMIT")
+
+ require.NotZerof(t, atomic.LoadUint32(&callCaptured),
+ "expected to found provisional txn id cache record, "+
+ "but it was not found")
+ })
+}
+
+// runtimeHookInjector provides a way to dynamically inject a testing knobs
+// into a running cluster.
+type runtimeHookInjector struct {
+ syncutil.RWMutex
+ op func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ )
+}
+
+func (s *runtimeHookInjector) hook(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+) {
+ s.RLock()
+ defer s.RUnlock()
+ s.op(sessionData, txnID, txnFingerprintID)
+}
+
+func (s *runtimeHookInjector) setHook(
+ op func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ ),
+) {
+ s.Lock()
+ defer s.Unlock()
+ s.op = op
+}
diff --git a/pkg/sql/contention/txnidcache/writer.go b/pkg/sql/contention/txnidcache/writer.go
new file mode 100644
index 000000000000..f86bc6bd3916
--- /dev/null
+++ b/pkg/sql/contention/txnidcache/writer.go
@@ -0,0 +1,67 @@
+// Copyright 2022 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package txnidcache
+
+import (
+ "sync"
+
+ "github.com/cockroachdb/cockroach/pkg/util/encoding"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+)
+
+// There is no strong reason why shardCount is 16 beyond that Java's
+// ConcurrentHashMap also uses 16 shards and has reasonably good performance.
+const shardCount = 16
+
+type writer struct {
+ shards [shardCount]*concurrentWriteBuffer
+
+ sink messageSink
+ msgBlockPool *sync.Pool
+}
+
+var _ Writer = &writer{}
+
+func newWriter(sink messageSink, msgBlockPool *sync.Pool) *writer {
+ w := &writer{
+ sink: sink,
+ msgBlockPool: msgBlockPool,
+ }
+
+ for shardIdx := 0; shardIdx < shardCount; shardIdx++ {
+ w.shards[shardIdx] = newConcurrentWriteBuffer(sink, msgBlockPool)
+ }
+
+ return w
+}
+
+// Record implements the Writer interface.
+func (w *writer) Record(resolvedTxnID ResolvedTxnID) {
+ shardIdx := hashTxnID(resolvedTxnID.TxnID)
+ buffer := w.shards[shardIdx]
+ buffer.Record(resolvedTxnID)
+}
+
+// Flush implements the Writer interface.
+func (w *writer) Flush() {
+ for shardIdx := 0; shardIdx < shardCount; shardIdx++ {
+ w.shards[shardIdx].Flush()
+ }
+}
+
+func hashTxnID(txnID uuid.UUID) int {
+ b := txnID.GetBytes()
+ _, val, err := encoding.DecodeUint64Descending(b)
+ if err != nil {
+ panic(err)
+ }
+ return int(val % shardCount)
+}
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index c6fe3d88ab8f..aa431afcce2e 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -1408,6 +1408,14 @@ type ExecutorTestingKnobs struct {
// OnTxnRetry, if set, will be called if there is a transaction retry.
OnTxnRetry func(autoRetryReason error, evalCtx *tree.EvalContext)
+
+ // BeforeTxnStatsRecorded, if set, will be called before the statistics
+ // of a transaction is being recorded.
+ BeforeTxnStatsRecorded func(
+ sessionData *sessiondata.SessionData,
+ txnID uuid.UUID,
+ txnFingerprintID roachpb.TransactionFingerprintID,
+ )
}
// PGWireTestingKnobs contains knobs for the pgwire module.
diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go
index 84b6e6e06213..0caa909af8cd 100644
--- a/pkg/sql/pgwire/server.go
+++ b/pkg/sql/pgwire/server.go
@@ -379,6 +379,7 @@ func (s *Server) Metrics() (res []interface{}) {
&s.SQLServer.InternalMetrics.EngineMetrics,
&s.SQLServer.InternalMetrics.GuardrailMetrics,
&s.SQLServer.ServerMetrics.StatsMetrics,
+ &s.SQLServer.ServerMetrics.ContentionSubsystemMetrics,
}
}
diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go
index a83bc8c924a9..93856f5acfe3 100644
--- a/pkg/sql/txn_state.go
+++ b/pkg/sql/txn_state.go
@@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel/attribute"
)
@@ -131,7 +132,8 @@ const (
)
// resetForNewSQLTxn (re)initializes the txnState for a new transaction.
-// It creates a new client.Txn and initializes it using the session defaults.
+// It creates a new client.Txn and initializes it using the session defaults
+// and returns the ID of the new transaction.
//
// connCtx: The context in which the new transaction is started (usually a
// connection's context). ts.Ctx will be set to a child context and should be
@@ -156,7 +158,7 @@ func (ts *txnState) resetForNewSQLTxn(
readOnly tree.ReadWriteMode,
txn *kv.Txn,
tranCtx transitionCtx,
-) {
+) (txnID uuid.UUID) {
// Reset state vars to defaults.
ts.sqlTimestamp = sqlTimestamp
ts.isHistorical = false
@@ -204,6 +206,7 @@ func (ts *txnState) resetForNewSQLTxn(
}
ts.mu.txn = txn
}
+ txnID = ts.mu.txn.ID()
sp.SetTag("txn", attribute.StringValue(ts.mu.txn.ID().String()))
ts.mu.txnStart = timeutil.Now()
ts.mu.Unlock()
@@ -215,12 +218,15 @@ func (ts *txnState) resetForNewSQLTxn(
if err := ts.setReadOnlyMode(readOnly); err != nil {
panic(err)
}
+
+ return txnID
}
// finishSQLTxn finalizes a transaction's results and closes the root span for
// the current SQL txn. This needs to be called before resetForNewSQLTxn() is
-// called for starting another SQL txn.
-func (ts *txnState) finishSQLTxn() {
+// called for starting another SQL txn. The ID of the finalized transaction is
+// returned.
+func (ts *txnState) finishSQLTxn() (txnID uuid.UUID) {
ts.mon.Stop(ts.Ctx)
if ts.cancel != nil {
ts.cancel()
@@ -238,10 +244,12 @@ func (ts *txnState) finishSQLTxn() {
sp.Finish()
ts.Ctx = nil
ts.mu.Lock()
+ txnID = ts.mu.txn.ID()
ts.mu.txn = nil
ts.mu.txnStart = time.Time{}
ts.mu.Unlock()
ts.recordingThreshold = 0
+ return txnID
}
// finishExternalTxn is a stripped-down version of finishSQLTxn used by
@@ -344,15 +352,26 @@ const (
)
// txnEvent is part of advanceInfo, informing the connExecutor about some
-// transaction events. It is used by the connExecutor to clear state associated
-// with a SQL transaction (other than the state encapsulated in TxnState; e.g.
-// schema changes and portals).
-//
-//go:generate stringer -type=txnEvent
-type txnEvent int
+// transaction events.
+type txnEvent struct {
+ // eventTyp is used by the connExecutor to clear state associated
+ // with a SQL transaction (other than the state encapsulated in TxnState; e.g.
+ // schema changes and portals).
+ eventTyp txnEventType
+
+ // txnID is filled when transaction starts, commits or aborts.
+ // When a transaction starts, txnID is set to the ID of the transaction that
+ // was created.
+ // When a transaction commits or aborts, txnID is set to the ID of the
+ // transaction that just finished execution.
+ txnID uuid.UUID
+}
+
+//go:generate stringer -type=txnEventType
+type txnEventType int
const (
- noEvent txnEvent = iota
+ noEvent txnEventType = iota
// txnStart means that the statement that just ran started a new transaction.
// Note that when a transaction is restarted, txnStart event is not emitted.
diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go
index 77c4992895ef..82872a1dcd30 100644
--- a/pkg/sql/txn_state_test.go
+++ b/pkg/sql/txn_state_test.go
@@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
)
@@ -142,7 +143,9 @@ func (tc *testContext) createNoTxnState() (fsm.State, *txnState) {
// checkAdv returns an error if adv does not match all the expected fields.
//
// Pass noRewindExpected for expRewPos if a rewind is not expected.
-func checkAdv(adv advanceInfo, expCode advanceCode, expRewPos CmdPos, expEv txnEvent) error {
+func checkAdv(
+ adv advanceInfo, expCode advanceCode, expRewPos CmdPos, expEv txnEventType, expTxnID uuid.UUID,
+) error {
if adv.code != expCode {
return errors.Errorf("expected code: %s, but got: %s (%+v)", expCode, adv.code, adv)
}
@@ -155,8 +158,25 @@ func checkAdv(adv advanceInfo, expCode advanceCode, expRewPos CmdPos, expEv txnE
return errors.Errorf("expected rewind to %d, but got: %+v", expRewPos, adv)
}
}
- if expEv != adv.txnEvent {
- return errors.Errorf("expected txnEvent: %s, got: %s", expEv, adv.txnEvent)
+ if expEv != adv.txnEvent.eventTyp {
+ return errors.Errorf("expected txnEventType: %s, got: %s", expEv, adv.txnEvent.eventTyp)
+ }
+
+ // The txnID field in advanceInfo is only checked when either:
+ // * txnStart: the txnID in this case should be not be empty.
+ // * txnCommit, txnRollback: the txnID in this case should equal to the
+ // transaction that just finished execution.
+ switch expEv {
+ case txnStart:
+ if adv.txnEvent.txnID.Equal(emptyTxnID) {
+ return errors.Errorf(
+ "expected txnID not to be empty, but it is")
+ }
+ case txnCommit, txnRollback:
+ if adv.txnEvent.txnID != expTxnID {
+ return errors.Errorf(
+ "expected txnID: %s, got: %s", expTxnID, adv.txnEvent.txnID)
+ }
}
return nil
}
@@ -223,7 +243,7 @@ func TestTransitions(t *testing.T) {
type expAdvance struct {
expCode advanceCode
- expEv txnEvent
+ expEv txnEventType
}
txnName := sqlTxnName
@@ -236,7 +256,9 @@ func TestTransitions(t *testing.T) {
// A function used to init the txnState to the desired state before the
// transition. The returned State and txnState are to be used to initialize
// a Machine.
- init func() (fsm.State, *txnState, error)
+ // If the initialized txnState contains an active transaction, its
+ // transaction ID is also returned. Else, emptyTxnID is returned.
+ init func() (fsm.State, *txnState, uuid.UUID, error)
// The event to deliver to the state machine.
ev fsm.Event
@@ -263,9 +285,9 @@ func TestTransitions(t *testing.T) {
{
// Start an implicit txn from NoTxn.
name: "NoTxn->Starting (implicit txn)",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createNoTxnState()
- return s, ts, nil
+ return s, ts, emptyTxnID, nil
},
ev: eventTxnStart{ImplicitTxn: fsm.True},
evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(),
@@ -288,9 +310,9 @@ func TestTransitions(t *testing.T) {
{
// Start an explicit txn from NoTxn.
name: "NoTxn->Starting (explicit txn)",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createNoTxnState()
- return s, ts, nil
+ return s, ts, emptyTxnID, nil
},
ev: eventTxnStart{ImplicitTxn: fsm.False},
evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(),
@@ -314,14 +336,14 @@ func TestTransitions(t *testing.T) {
{
// Finish an implicit txn.
name: "Open (implicit) -> NoTxn",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(implicitTxn)
// We commit the KV transaction, as that's done by the layer below
// txnState.
if err := ts.mu.txn.Commit(ts.Ctx); err != nil {
- return nil, nil, err
+ return nil, nil, emptyTxnID, err
}
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnFinishCommitted{},
evPayload: nil,
@@ -335,14 +357,14 @@ func TestTransitions(t *testing.T) {
{
// Finish an explicit txn.
name: "Open (explicit) -> NoTxn",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
// We commit the KV transaction, as that's done by the layer below
// txnState.
if err := ts.mu.txn.Commit(ts.Ctx); err != nil {
- return nil, nil, err
+ return nil, nil, emptyTxnID, err
}
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnFinishCommitted{},
evPayload: nil,
@@ -356,9 +378,9 @@ func TestTransitions(t *testing.T) {
{
// Get a retriable error while we can auto-retry.
name: "Open + auto-retry",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
@@ -380,9 +402,9 @@ func TestTransitions(t *testing.T) {
// except this time the error is on a COMMIT. This shouldn't make any
// difference; we should still auto-retry like the above.
name: "Open + auto-retry (COMMIT)",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
@@ -403,9 +425,9 @@ func TestTransitions(t *testing.T) {
// Get a retriable error when we can no longer auto-retry, but the client
// is doing client-side retries.
name: "Open + client retry",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
@@ -430,9 +452,9 @@ func TestTransitions(t *testing.T) {
// done a RELEASE such that COMMIT couldn't get retriable errors), and so
// we can't go to RestartWait.
name: "Open + client retry + error on COMMIT",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
@@ -452,9 +474,9 @@ func TestTransitions(t *testing.T) {
{
// An error on COMMIT leaves us in NoTxn, not in Aborted.
name: "Open + non-retriable error on COMMIT",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventNonRetriableErr{IsCommit: fsm.True},
evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")},
@@ -470,9 +492,9 @@ func TestTransitions(t *testing.T) {
// Like the above, but this time with an implicit txn: we get a retriable
// error, but we can't auto-retry. We expect to go to NoTxn.
name: "Open + useless retriable error (implicit)",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(implicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) {
b := eventRetriableErrPayload{
@@ -492,9 +514,9 @@ func TestTransitions(t *testing.T) {
{
// We get a non-retriable error.
name: "Open + non-retriable error",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventNonRetriableErr{IsCommit: fsm.False},
evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")},
@@ -508,11 +530,11 @@ func TestTransitions(t *testing.T) {
{
// We go to CommitWait (after a RELEASE SAVEPOINT).
name: "Open->CommitWait",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
// Simulate what execution does before generating this event.
err := ts.mu.txn.Commit(ts.Ctx)
- return s, ts, err
+ return s, ts, ts.mu.txn.ID(), err
},
ev: eventTxnReleased{},
expState: stateCommitWait{},
@@ -525,9 +547,9 @@ func TestTransitions(t *testing.T) {
{
// Restarting from Open via ROLLBACK TO SAVEPOINT.
name: "Open + restart",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createOpenState(explicitTxn)
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnRestart{},
expState: stateOpen{ImplicitTxn: fsm.False},
@@ -545,9 +567,9 @@ func TestTransitions(t *testing.T) {
{
// The txn finished, such as after a ROLLBACK.
name: "Aborted->NoTxn",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createAbortedState()
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnFinishAborted{},
expState: stateNoTxn{},
@@ -560,9 +582,9 @@ func TestTransitions(t *testing.T) {
{
// The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted).
name: "Aborted->Open",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createAbortedState()
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventSavepointRollback{},
expState: stateOpen{ImplicitTxn: fsm.False},
@@ -575,9 +597,9 @@ func TestTransitions(t *testing.T) {
{
// The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted).
name: "Aborted->Restart",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createAbortedState()
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnRestart{},
expState: stateOpen{ImplicitTxn: fsm.False},
@@ -597,9 +619,9 @@ func TestTransitions(t *testing.T) {
// Verify that the historical timestamp from the evPayload is propagated
// to the expTxn.
name: "Aborted->Starting (historical)",
- init: func() (fsm.State, *txnState, error) {
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
s, ts := testCon.createAbortedState()
- return s, ts, nil
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnRestart{},
expState: stateOpen{ImplicitTxn: fsm.False},
@@ -616,8 +638,12 @@ func TestTransitions(t *testing.T) {
//
{
name: "CommitWait->NoTxn",
- init: func() (fsm.State, *txnState, error) {
- return testCon.createCommitWaitState()
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
+ s, ts, err := testCon.createCommitWaitState()
+ if err != nil {
+ return nil, nil, emptyTxnID, err
+ }
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventTxnFinishCommitted{},
expState: stateNoTxn{},
@@ -629,8 +655,12 @@ func TestTransitions(t *testing.T) {
},
{
name: "CommitWait + err",
- init: func() (fsm.State, *txnState, error) {
- return testCon.createCommitWaitState()
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
+ s, ts, err := testCon.createCommitWaitState()
+ if err != nil {
+ return nil, nil, emptyTxnID, err
+ }
+ return s, ts, ts.mu.txn.ID(), nil
},
ev: eventNonRetriableErr{IsCommit: fsm.False},
evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")},
@@ -645,7 +675,7 @@ func TestTransitions(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Get the initial state.
- s, ts, err := tc.init()
+ s, ts, expectedTxnID, err := tc.init()
if err != nil {
t.Fatal(err)
}
@@ -673,7 +703,7 @@ func TestTransitions(t *testing.T) {
expRewPos = dummyRewCap.rewindPos
}
if err := checkAdv(
- adv, tc.expAdv.expCode, expRewPos, tc.expAdv.expEv,
+ adv, tc.expAdv.expCode, expRewPos, tc.expAdv.expEv, expectedTxnID,
); err != nil {
t.Fatal(err)
}
diff --git a/pkg/sql/txnevent_string.go b/pkg/sql/txnevent_string.go
deleted file mode 100644
index 1590c65df04c..000000000000
--- a/pkg/sql/txnevent_string.go
+++ /dev/null
@@ -1,27 +0,0 @@
-// Code generated by "stringer"; DO NOT EDIT.
-
-package sql
-
-import "strconv"
-
-func _() {
- // An "invalid array index" compiler error signifies that the constant values have changed.
- // Re-run the stringer command to generate them again.
- var x [1]struct{}
- _ = x[noEvent-0]
- _ = x[txnStart-1]
- _ = x[txnCommit-2]
- _ = x[txnRollback-3]
- _ = x[txnRestart-4]
-}
-
-const _txnEvent_name = "noEventtxnStarttxnCommittxnRollbacktxnRestart"
-
-var _txnEvent_index = [...]uint8{0, 7, 15, 24, 35, 45}
-
-func (i txnEvent) String() string {
- if i < 0 || i >= txnEvent(len(_txnEvent_index)-1) {
- return "txnEvent(" + strconv.FormatInt(int64(i), 10) + ")"
- }
- return _txnEvent_name[_txnEvent_index[i]:_txnEvent_index[i+1]]
-}
diff --git a/pkg/sql/txneventtype_string.go b/pkg/sql/txneventtype_string.go
new file mode 100644
index 000000000000..91e7607ca6e7
--- /dev/null
+++ b/pkg/sql/txneventtype_string.go
@@ -0,0 +1,27 @@
+// Code generated by "stringer"; DO NOT EDIT.
+
+package sql
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[noEvent-0]
+ _ = x[txnStart-1]
+ _ = x[txnCommit-2]
+ _ = x[txnRollback-3]
+ _ = x[txnRestart-4]
+}
+
+const _txnEventType_name = "noEventtxnStarttxnCommittxnRollbacktxnRestart"
+
+var _txnEventType_index = [...]uint8{0, 7, 15, 24, 35, 45}
+
+func (i txnEventType) String() string {
+ if i < 0 || i >= txnEventType(len(_txnEventType_index)-1) {
+ return "txnEventType(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _txnEventType_name[_txnEventType_index[i]:_txnEventType_index[i+1]]
+}
diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go
index ee6f68183af9..f8ed23dec07d 100644
--- a/pkg/ts/catalog/chart_catalog.go
+++ b/pkg/ts/catalog/chart_catalog.go
@@ -1880,6 +1880,19 @@ var charts = []sectionDescription{
},
},
},
+ {
+ Organization: [][]string{{SQLLayer, "Contention"}},
+ Charts: []chartDescription{
+ {
+ Title: "Transaction ID Cache Miss",
+ Metrics: []string{"sql.contention.txn_id_cache.miss"},
+ },
+ {
+ Title: "Transaction ID Cache Read",
+ Metrics: []string{"sql.contention.txn_id_cache.read"},
+ },
+ },
+ },
{
Organization: [][]string{{SQLLayer, "Bulk"}},
Charts: []chartDescription{