From 96a6decd8bee2dd240c1cc64acc048e78a424cc9 Mon Sep 17 00:00:00 2001 From: Azhng Date: Thu, 11 Nov 2021 15:07:58 -0500 Subject: [PATCH 1/2] sql: introduce Transaction ID Cache Previously, it was impossible to correlate an individual execution of a transaction (identified via transaction ID) to this historical execution statistics (identified via transaction fingerprint ID). This commit introduces Transaction ID Cache (TxnIDCache), a FIFO cache that stores the mapping from transaction ID to transaction fingerprint ID. This buffer records the mapping at the end of the transaction execution. The oldest entry in the buffer will be evicted through FIFO policy. The default size of this Transaction ID Cache is capped at 64 MB and it is configurable via the sql.contention.txn_id_cache.max_size cluster setting. Release note (sql change): Transaction ID to Transaction Fingerprint ID mapping is now stored in the new Transaction ID Cache, a FIFO unordered in-memory buffer. The size of the buffer is 64 MB by default and configurable via sql.contention.txn_id_cache.max_size cluster setting. Consequentially, two additioanl metrics are introduced: * sql.contention.txn_id_cache.size: tracks the current memory usage of transaction ID Cache * sql.contention.txn_id_cache.discarded_count: number of resolved transaction IDs that are dropped due to memory constraints. --- .../settings/settings-for-tenants.txt | 1 + docs/generated/settings/settings.html | 1 + pkg/BUILD.bazel | 1 + pkg/roachpb/app_stats.go | 3 + pkg/sql/BUILD.bazel | 7 +- pkg/sql/conn_executor.go | 47 ++- pkg/sql/conn_executor_exec.go | 25 +- pkg/sql/conn_fsm.go | 102 +++++-- pkg/sql/contention/txnidcache/BUILD.bazel | 53 ++++ .../contention/txnidcache/cluster_settings.go | 21 ++ .../txnidcache/concurrent_write_buffer.go | 118 ++++++++ pkg/sql/contention/txnidcache/main_test.go | 29 ++ pkg/sql/contention/txnidcache/metrics.go | 43 +++ pkg/sql/contention/txnidcache/txn_id_cache.go | 245 +++++++++++++++ .../txnidcache/txn_id_cache_test.go | 282 ++++++++++++++++++ pkg/sql/contention/txnidcache/writer.go | 67 +++++ pkg/sql/exec_util.go | 8 + pkg/sql/pgwire/server.go | 1 + pkg/sql/txn_state.go | 41 ++- pkg/sql/txn_state_test.go | 124 +++++--- pkg/sql/txnevent_string.go | 27 -- pkg/sql/txneventtype_string.go | 27 ++ pkg/ts/catalog/chart_catalog.go | 13 + 23 files changed, 1164 insertions(+), 122 deletions(-) create mode 100644 pkg/sql/contention/txnidcache/BUILD.bazel create mode 100644 pkg/sql/contention/txnidcache/cluster_settings.go create mode 100644 pkg/sql/contention/txnidcache/concurrent_write_buffer.go create mode 100644 pkg/sql/contention/txnidcache/main_test.go create mode 100644 pkg/sql/contention/txnidcache/metrics.go create mode 100644 pkg/sql/contention/txnidcache/txn_id_cache.go create mode 100644 pkg/sql/contention/txnidcache/txn_id_cache_test.go create mode 100644 pkg/sql/contention/txnidcache/writer.go delete mode 100644 pkg/sql/txnevent_string.go create mode 100644 pkg/sql/txneventtype_string.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 115a3027bc5b..409b4d010477 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -81,6 +81,7 @@ server.web_session.purge.max_deletions_per_cycle integer 10 the maximum number o server.web_session.purge.period duration 1h0m0s the time until old sessions are deleted server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid +sql.contention.txn_id_cache.max_size byte size 64 MiB the maximum byte size TxnID cache will use sql.cross_db_fks.enabled boolean false if true, creating foreign key references across databases is allowed sql.cross_db_sequence_owners.enabled boolean false if true, creating sequences owned by tables from other databases is allowed sql.cross_db_sequence_references.enabled boolean false if true, sequences referenced by tables from other databases are allowed diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3ca78510e519..f63361501995 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -88,6 +88,7 @@ server.web_session.purge.periodduration1h0m0sthe time until old sessions are deleted server.web_session.purge.ttlduration1h0m0sif nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeoutduration168h0m0sthe duration that a newly created web session will be valid +sql.contention.txn_id_cache.max_sizebyte size64 MiBthe maximum byte size TxnID cache will use sql.cross_db_fks.enabledbooleanfalseif true, creating foreign key references across databases is allowed sql.cross_db_sequence_owners.enabledbooleanfalseif true, creating sequences owned by tables from other databases is allowed sql.cross_db_sequence_references.enabledbooleanfalseif true, sequences referenced by tables from other databases are allowed diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 0725b8bcc394..eed4b0827394 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -238,6 +238,7 @@ ALL_TESTS = [ "//pkg/sql/colflow/colrpc:colrpc_test", "//pkg/sql/colflow:colflow_test", "//pkg/sql/colmem:colmem_test", + "//pkg/sql/contention/txnidcache:txnidcache_test", "//pkg/sql/contention:contention_test", "//pkg/sql/covering:covering_test", "//pkg/sql/distsql:distsql_test", diff --git a/pkg/roachpb/app_stats.go b/pkg/roachpb/app_stats.go index 2722c831ae44..e5c6f02187b1 100644 --- a/pkg/roachpb/app_stats.go +++ b/pkg/roachpb/app_stats.go @@ -60,6 +60,9 @@ func ConstructStatementFingerprintID( // individual statement fingerprint IDs that comprise the transaction. type TransactionFingerprintID uint64 +// InvalidTransactionFingerprintID denotes an invalid transaction fingerprint ID. +const InvalidTransactionFingerprintID = TransactionFingerprintID(0) + // Size returns the size of the TransactionFingerprintID. func (t TransactionFingerprintID) Size() int64 { return 8 diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 1aa07cb9d66d..cfe7be587e88 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -238,7 +238,7 @@ go_library( "zone_config.go", ":gen-advancecode-stringer", # keep ":gen-nodestatus-stringer", # keep - ":gen-txnevent-stringer", # keep + ":gen-txneventtype-stringer", # keep ":gen-txntype-stringer", # keep ], importpath = "github.com/cockroachdb/cockroach/pkg/sql", @@ -311,6 +311,7 @@ go_library( "//pkg/sql/colexec", "//pkg/sql/colflow", "//pkg/sql/contention", + "//pkg/sql/contention/txnidcache", "//pkg/sql/covering", "//pkg/sql/delegate", "//pkg/sql/descmetadata", @@ -701,9 +702,9 @@ go_test( ) stringer( - name = "gen-txnevent-stringer", + name = "gen-txneventtype-stringer", src = "txn_state.go", - typ = "txnEvent", + typ = "txnEventType", ) stringer( diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index f0055419269d..10b69d66c6a8 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/idxusage" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -279,6 +280,10 @@ type Server struct { // node as gateway node. indexUsageStats *idxusage.LocalIndexUsageStats + // txnIDCache stores the mapping from transaction ID to transaction + // fingerprint IDs for all recently executed transactions. + txnIDCache *txnidcache.Cache + // Metrics is used to account normal queries. Metrics Metrics @@ -319,6 +324,10 @@ type Metrics struct { type ServerMetrics struct { // StatsMetrics contains metrics for SQL statistics collection. StatsMetrics StatsMetrics + + // ContentionSubsystemMetrics contains metrics related to contention + // subsystem. + ContentionSubsystemMetrics txnidcache.Metrics } // NewServer creates a new Server. Start() needs to be called before the Server @@ -361,6 +370,9 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { ChannelSize: idxusage.DefaultChannelSize, Setting: cfg.Settings, }), + txnIDCache: txnidcache.NewTxnIDCache( + cfg.Settings, + &serverMetrics.ContentionSubsystemMetrics), } telemetryLoggingMetrics := &TelemetryLoggingMetrics{} @@ -451,6 +463,7 @@ func makeServerMetrics(cfg *ExecutorConfig) ServerMetrics { MetaSQLTxnStatsCollectionOverhead, 6*metricsSampleInterval, ), }, + ContentionSubsystemMetrics: txnidcache.NewMetrics(), } } @@ -462,6 +475,8 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { // accumulated in the reporter when the telemetry server fails. // Usually it is telemetry's reporter's job to clear the reporting SQL Stats. s.reportedStats.Start(ctx, stopper) + + s.txnIDCache.Start(ctx, stopper) } // GetSQLStatsController returns the persistedsqlstats.Controller for current @@ -487,6 +502,11 @@ func (s *Server) GetReportedSQLStatsController() *sslocal.Controller { return s.reportedStatsController } +// GetTxnIDCache returns the txnidcache.Cache for the current sql.Server. +func (s *Server) GetTxnIDCache() *txnidcache.Cache { + return s.txnIDCache +} + // GetScrubbedStmtStats returns the statement statistics by app, with the // queries scrubbed of their identifiers. Any statements which cannot be // scrubbed will be omitted from the returned map. @@ -814,6 +834,7 @@ func (s *Server) newConnExecutor( hasCreatedTemporarySchema: false, stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, indexUsageStats: s.indexUsageStats, + txnIDCacheWriter: s.txnIDCache, } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -985,9 +1006,9 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{}) func (ex *connExecutor) close(ctx context.Context, closeType closeType) { ex.sessionEventf(ctx, "finishing connExecutor") - txnEv := noEvent + txnEvType := noEvent if _, noTxn := ex.machine.CurState().(stateNoTxn); !noTxn { - txnEv = txnRollback + txnEvType = txnRollback } if closeType == normalClose { @@ -1019,7 +1040,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { ex.state.finishExternalTxn() } - if err := ex.resetExtraTxnState(ctx, txnEv); err != nil { + if err := ex.resetExtraTxnState(ctx, txnEvent{eventType: txnEvType}); err != nil { log.Warningf(ctx, "error while cleaning up connExecutor: %s", err) } @@ -1401,6 +1422,10 @@ type connExecutor struct { // indexUsageStats is used to track index usage stats. indexUsageStats *idxusage.LocalIndexUsageStats + + // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the + // Transaction ID Cache. + txnIDCacheWriter txnidcache.Writer } // ctxHolder contains a connection's context and, while session tracing is @@ -1516,7 +1541,9 @@ func (ns *prepStmtNamespace) resetTo( } // resetExtraTxnState resets the fields of ex.extraTxnState when a transaction -// commits, rolls back or restarts. +// finishes execution (either commits, rollbacks or restarts). Based on the +// transaction event, resetExtraTxnState invokes corresponding callbacks +// (e.g. onTxnFinish() and onTxnRestart()). func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) error { ex.extraTxnState.jobs = nil ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{} @@ -1536,7 +1563,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err delete(ex.extraTxnState.prepStmtsNamespace.portals, name) } - switch ev { + switch ev.eventType { case txnCommit, txnRollback: for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals { p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) @@ -2046,7 +2073,7 @@ func (ex *connExecutor) updateTxnRewindPosMaybe( if _, ok := ex.machine.CurState().(stateOpen); !ok { return nil } - if advInfo.txnEvent == txnStart || advInfo.txnEvent == txnRestart { + if advInfo.txnEvent.eventType == txnStart || advInfo.txnEvent.eventType == txnRestart { var nextPos CmdPos switch advInfo.code { case stayInPlace: @@ -2228,7 +2255,7 @@ func (ex *connExecutor) execCopyIn( } else { txnOpt = copyTxnOpt{ resetExtraTxnState: func(ctx context.Context) error { - return ex.resetExtraTxnState(ctx, noEvent) + return ex.resetExtraTxnState(ctx, txnEvent{eventType: noEvent}) }, } } @@ -2680,7 +2707,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } // Handle transaction events which cause updates to txnState. - switch advInfo.txnEvent { + switch advInfo.txnEvent.eventType { case noEvent: _, nextStateIsAborted := ex.machine.CurState().(stateAborted) // Update the deadline on the transaction based on the collections, @@ -2696,7 +2723,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( case txnStart: ex.extraTxnState.autoRetryCounter = 0 ex.extraTxnState.autoRetryReason = nil - ex.recordTransactionStart() + ex.recordTransactionStart(advInfo.txnEvent.txnID) // Bump the txn counter for logging. ex.extraTxnState.txnCounter++ if !ex.server.cfg.Codec.ForSystemTenant() { @@ -2719,7 +2746,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( err := errorutil.UnexpectedWithIssueErrorf( 26687, "programming error: non-error event %s generated even though res.Err() has been set to: %s", - errors.Safe(advInfo.txnEvent.String()), + errors.Safe(advInfo.txnEvent.eventType.String()), res.Err()) log.Errorf(ex.Ctx(), "%v", err) errorutil.SendReport(ex.Ctx(), &ex.server.cfg.Settings.SV, err) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index be3f4bde3ae2..b4442a62683d 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain" @@ -51,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/lib/pq/oid" ) @@ -1881,7 +1883,14 @@ func payloadHasError(payload fsm.EventPayload) bool { } // recordTransactionStart records the start of the transaction. -func (ex *connExecutor) recordTransactionStart() { +func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { + // Transaction fingerprint ID will be available once transaction finishes + // execution. + ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + TxnID: txnID, + TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, + }) + ex.state.mu.RLock() txnStart := ex.state.mu.txnStart ex.state.mu.RUnlock() @@ -1934,6 +1943,13 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } + if ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded != nil { + ex.server.cfg.TestingKnobs.BeforeTxnStatsRecorded( + ex.sessionData(), + ev.txnID, + transactionFingerprintID, + ) + } err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { @@ -1991,13 +2007,18 @@ func (ex *connExecutor) recordTransaction( ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) ex.metrics.EngineMetrics.SQLTxnLatency.RecordValue(txnTime.Nanoseconds()) + ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + TxnID: ev.txnID, + TxnFingerprintID: transactionFingerprintID, + }) + txnServiceLat := ex.phaseTimes.GetTransactionServiceLatency() txnRetryLat := ex.phaseTimes.GetTransactionRetryLatency() commitLat := ex.phaseTimes.GetCommitLatency() recordedTxnStats := sqlstats.RecordedTxnStats{ TransactionTimeSec: txnTime.Seconds(), - Committed: ev == txnCommit, + Committed: ev.eventType == txnCommit, ImplicitTxn: implicit, RetryCount: int64(ex.extraTxnState.autoRetryCounter), StatementFingerprintIDs: ex.extraTxnState.transactionStatementFingerprintIDs, diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go index b9f44c86e8e7..26521abd0265 100644 --- a/pkg/sql/conn_fsm.go +++ b/pkg/sql/conn_fsm.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlfsm" "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // Constants for the String() representation of the session states. Shared with @@ -199,6 +200,10 @@ func (eventRetriableErr) Event() {} func (eventTxnRestart) Event() {} func (eventTxnReleased) Event() {} +// Other constants. + +var emptyTxnID = uuid.UUID{} + // TxnStateTransitions describe the transitions used by a connExecutor's // fsm.Machine. Args.Extended is a txnState, which is muted by the Actions. // @@ -231,7 +236,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateNoTxn{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.setAdvanceInfo(skipBatch, noRewind, noEvent) + ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventType: noEvent}) return nil }, }, @@ -284,7 +289,8 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ args.Extended.(*txnState).setAdvanceInfo( rewind, args.Payload.(eventRetriableErrPayload).rewCap, - txnRestart) + txnEvent{eventType: txnRestart}, + ) return nil }, }, @@ -306,7 +312,7 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateAborted{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.setAdvanceInfo(skipBatch, noRewind, noEvent) + ts.setAdvanceInfo(skipBatch, noRewind, txnEvent{eventType: noEvent}) return nil }, }, @@ -316,7 +322,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) + args.Extended.(*txnState).setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnRestart}, + ) return nil }, }, @@ -325,7 +335,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Action: func(args fsm.Args) error { // Note: Preparing the KV txn for restart has already happened by this // point. - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + args.Extended.(*txnState).setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: noEvent}, + ) return nil }, }, @@ -333,7 +347,15 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "RELEASE SAVEPOINT cockroach_restart", Next: stateCommitWait{}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnCommit) + ts := args.Extended.(*txnState) + ts.mu.Lock() + txnID := ts.mu.txn.ID() + ts.mu.Unlock() + ts.setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnCommit, txnID: txnID}, + ) return nil }, }, @@ -361,7 +383,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "any other statement", Next: stateAborted{}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + args.Extended.(*txnState).setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: noEvent}, + ) return nil }, }, @@ -377,7 +403,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) success", Next: stateOpen{ImplicitTxn: fsm.False}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, noEvent) + args.Extended.(*txnState).setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: noEvent}, + ) return nil }, }, @@ -387,7 +417,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "ROLLBACK TO SAVEPOINT (not cockroach_restart) failed because txn needs restart", Next: stateAborted{}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + args.Extended.(*txnState).setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: noEvent}, + ) return nil }, }, @@ -396,7 +430,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "ROLLBACK TO SAVEPOINT cockroach_restart", Next: stateOpen{ImplicitTxn: fsm.False}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(advanceOne, noRewind, txnRestart) + args.Extended.(*txnState).setAdvanceInfo( + advanceOne, + noRewind, + txnEvent{eventType: txnRestart}, + ) return nil }, }, @@ -421,7 +459,11 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{ Description: "any other statement", Next: stateCommitWait{}, Action: func(args fsm.Args) error { - args.Extended.(*txnState).setAdvanceInfo(skipBatch, noRewind, noEvent) + args.Extended.(*txnState).setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: noEvent}, + ) return nil }, }, @@ -445,7 +487,7 @@ func noTxnToOpen(args fsm.Args) error { advCode = stayInPlace } - ts.resetForNewSQLTxn( + newTxnID := ts.resetForNewSQLTxn( connCtx, txnTyp, payload.txnSQLTimestamp, @@ -455,15 +497,19 @@ func noTxnToOpen(args fsm.Args) error { nil, /* txn */ payload.tranCtx, ) - ts.setAdvanceInfo(advCode, noRewind, txnStart) + ts.setAdvanceInfo( + advCode, + noRewind, + txnEvent{eventType: txnStart, txnID: newTxnID}, + ) return nil } // finishTxn finishes the transaction. It also calls setAdvanceInfo() with the // given event. -func (ts *txnState) finishTxn(ev txnEvent) error { - ts.finishSQLTxn() - ts.setAdvanceInfo(advanceOne, noRewind, ev) +func (ts *txnState) finishTxn(ev txnEventType) error { + finishedTxnID := ts.finishSQLTxn() + ts.setAdvanceInfo(advanceOne, noRewind, txnEvent{eventType: ev, txnID: finishedTxnID}) return nil } @@ -471,8 +517,12 @@ func (ts *txnState) finishTxn(ev txnEvent) error { func cleanupAndFinishOnError(args fsm.Args) error { ts := args.Extended.(*txnState) ts.mu.txn.CleanupOnError(ts.Ctx, args.Payload.(payloadWithError).errorCause()) - ts.finishSQLTxn() - ts.setAdvanceInfo(skipBatch, noRewind, txnRollback) + finishedTxnID := ts.finishSQLTxn() + ts.setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: txnRollback, txnID: finishedTxnID}, + ) return nil } @@ -488,8 +538,12 @@ var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateInternalError{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.finishSQLTxn() - ts.setAdvanceInfo(skipBatch, noRewind, txnRollback) + finishedTxnID := ts.finishSQLTxn() + ts.setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: txnRollback, txnID: finishedTxnID}, + ) return nil }, }, @@ -497,8 +551,12 @@ var BoundTxnStateTransitions = fsm.Compile(fsm.Pattern{ Next: stateInternalError{}, Action: func(args fsm.Args) error { ts := args.Extended.(*txnState) - ts.finishSQLTxn() - ts.setAdvanceInfo(skipBatch, noRewind, txnRollback) + finishedTxnID := ts.finishSQLTxn() + ts.setAdvanceInfo( + skipBatch, + noRewind, + txnEvent{eventType: txnRollback, txnID: finishedTxnID}, + ) return nil }, }, diff --git a/pkg/sql/contention/txnidcache/BUILD.bazel b/pkg/sql/contention/txnidcache/BUILD.bazel new file mode 100644 index 000000000000..87685dab0f76 --- /dev/null +++ b/pkg/sql/contention/txnidcache/BUILD.bazel @@ -0,0 +1,53 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "txnidcache", + srcs = [ + "cluster_settings.go", + "concurrent_write_buffer.go", + "metrics.go", + "txn_id_cache.go", + "writer.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/contention/txnidcache", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:with-mocks", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/cache", + "//pkg/util/encoding", + "//pkg/util/metric", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/uuid", + ], +) + +go_test( + name = "txnidcache_test", + srcs = [ + "main_test.go", + "txn_id_cache_test.go", + ], + deps = [ + "//pkg/kv", + "//pkg/roachpb:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/sessiondata", + "//pkg/sql/tests", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/syncutil", + "//pkg/util/uuid", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/sql/contention/txnidcache/cluster_settings.go b/pkg/sql/contention/txnidcache/cluster_settings.go new file mode 100644 index 000000000000..c2fbf3c47474 --- /dev/null +++ b/pkg/sql/contention/txnidcache/cluster_settings.go @@ -0,0 +1,21 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import "github.com/cockroachdb/cockroach/pkg/settings" + +// MaxSize limits the maximum byte size can be used by the TxnIDCache. +var MaxSize = settings.RegisterByteSizeSetting( + settings.TenantWritable, + `sql.contention.txn_id_cache.max_size`, + "the maximum byte size TxnID cache will use", + 64*1024*1024, // 64 MB +).WithPublic() diff --git a/pkg/sql/contention/txnidcache/concurrent_write_buffer.go b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go new file mode 100644 index 000000000000..2827afd1eb8a --- /dev/null +++ b/pkg/sql/contention/txnidcache/concurrent_write_buffer.go @@ -0,0 +1,118 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "sync" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +const messageBlockSize = 1024 + +type messageBlock [messageBlockSize]ResolvedTxnID + +// concurrentWriteBuffer is a data structure that optimizes for concurrent +// writes and also implements the Writer interface. +// +// Any write requests initially starts by holding a read lock (flushSyncLock) +// and then reserves an index to the messageBlock (a fixed-length array). If the +// reserved index is valid, concurrentWriteBuffer immediately writes to the +// array at the reserved index. However, if the reserved index is not valid, +// (that is, array index out of bound), there are two scenarios: +// 1. If the reserved index == size of the array, then the caller of Record() +// method is responsible for flushing the entire array into the channel. The +// caller does so by upgrading the read-lock to a write-lock, therefore +// blocks all future writers from writing to the shared array. After the +// flush is performed, the write-lock is then downgraded to a read-lock. +// 2. If the reserved index > size of the array, then the caller of Record() +// is blocked until the array is flushed. This is achieved by waiting on the +// conditional variable (flushDone) while holding onto the read-lock. After +// the flush is completed, the writer is unblocked and allowed to retry. +type concurrentWriteBuffer struct { + flushSyncLock syncutil.RWMutex + flushDone sync.Cond + + msgBlockPool *sync.Pool + + // msgBlock is the temporary buffer that concurrentWriteBuffer uses to batch + // write requests before sending them into the channel. + msgBlock *messageBlock + + // atomicIdx is the index pointing into the fixed-length array within the + // msgBlock.This should only be accessed using atomic package. + atomicIdx int64 + + // sink is the flush target that concurrentWriteBuffer flushes to once + // msgBlock is full. + sink messageSink +} + +var _ Writer = &concurrentWriteBuffer{} + +// newConcurrentWriteBuffer returns a new instance of concurrentWriteBuffer. +func newConcurrentWriteBuffer(sink messageSink, msgBlockPool *sync.Pool) *concurrentWriteBuffer { + writeBuffer := &concurrentWriteBuffer{ + sink: sink, + msgBlockPool: msgBlockPool, + msgBlock: msgBlockPool.Get().(*messageBlock), + } + writeBuffer.flushDone.L = writeBuffer.flushSyncLock.RLocker() + return writeBuffer +} + +// Record records a mapping from txnID to its corresponding transaction +// fingerprint ID. Record is safe to be used concurrently. +func (c *concurrentWriteBuffer) Record(resolvedTxnID ResolvedTxnID) { + c.flushSyncLock.RLock() + defer c.flushSyncLock.RUnlock() + for { + reservedIdx := c.reserveMsgBlockIndex() + if reservedIdx < messageBlockSize { + c.msgBlock[reservedIdx] = resolvedTxnID + return + } else if reservedIdx == messageBlockSize { + c.flushMsgBlockToChannelRLocked() + } else { + c.flushDone.Wait() + } + } +} + +// Flush flushes concurrentWriteBuffer into the channel. It implements the +// txnidcache.Writer interface. +func (c *concurrentWriteBuffer) Flush() { + c.flushSyncLock.Lock() + c.flushMsgBlockToChannelLocked() + c.flushSyncLock.Unlock() +} + +func (c *concurrentWriteBuffer) flushMsgBlockToChannelRLocked() { + // We upgrade the read-lock to a write-lock, then when we are done flushing, + // the lock is downgraded to a read-lock. + c.flushSyncLock.RUnlock() + defer c.flushSyncLock.RLock() + c.flushSyncLock.Lock() + defer c.flushSyncLock.Unlock() + c.flushMsgBlockToChannelLocked() +} + +func (c *concurrentWriteBuffer) flushMsgBlockToChannelLocked() { + c.sink.push(c.msgBlock) + c.msgBlock = c.msgBlockPool.Get().(*messageBlock) + c.flushDone.Broadcast() + atomic.StoreInt64(&c.atomicIdx, 0) +} + +func (c *concurrentWriteBuffer) reserveMsgBlockIndex() int64 { + return atomic.AddInt64(&c.atomicIdx, 1) - 1 // since array is 0-indexed. +} diff --git a/pkg/sql/contention/txnidcache/main_test.go b/pkg/sql/contention/txnidcache/main_test.go new file mode 100644 index 000000000000..327d352a4d3b --- /dev/null +++ b/pkg/sql/contention/txnidcache/main_test.go @@ -0,0 +1,29 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} diff --git a/pkg/sql/contention/txnidcache/metrics.go b/pkg/sql/contention/txnidcache/metrics.go new file mode 100644 index 000000000000..35a6736b3cfc --- /dev/null +++ b/pkg/sql/contention/txnidcache/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +// Metrics is a structs that include all metrics related to contention +// subsystem. +type Metrics struct { + CacheMissCounter *metric.Counter + CacheReadCounter *metric.Counter +} + +var _ metric.Struct = Metrics{} + +// MetricStruct implements the metric.Struct interface. +func (Metrics) MetricStruct() {} + +// NewMetrics returns a new instance of Metrics. +func NewMetrics() Metrics { + return Metrics{ + CacheMissCounter: metric.NewCounter(metric.Metadata{ + Name: "sql.contention.txn_id_cache.miss", + Help: "Number of cache misses", + Measurement: "Cache miss", + Unit: metric.Unit_COUNT, + }), + CacheReadCounter: metric.NewCounter(metric.Metadata{ + Name: "sql.contention.txn_id_cache.read", + Help: "Number of cache read", + Measurement: "Cache read", + Unit: metric.Unit_COUNT, + }), + } +} diff --git a/pkg/sql/contention/txnidcache/txn_id_cache.go b/pkg/sql/contention/txnidcache/txn_id_cache.go new file mode 100644 index 000000000000..9384ed704430 --- /dev/null +++ b/pkg/sql/contention/txnidcache/txn_id_cache.go @@ -0,0 +1,245 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "context" + "sync" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// Reader is the interface that can be used to query the transaction fingerprint +// ID of a transaction ID. +type Reader interface { + // Lookup returns the corresponding transaction fingerprint ID for a given txnID, + // if the given txnID has no entry in the Cache, the returned "found" boolean + // will be false. + Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) +} + +// Writer is the interface that can be used to write to txnidcache. +type Writer interface { + // Record writes a pair of transactionID and transaction fingerprint ID + // into a temporary buffer. This buffer will eventually be flushed into + // the transaction ID cache asynchronously. + Record(resolvedTxnID ResolvedTxnID) + + // Flush starts the flushing process of writer's temporary buffer. + Flush() +} + +type messageSink interface { + // push allows a messageBlock to be pushed into the pusher. + push(*messageBlock) +} + +const channelSize = 128 + +// Cache stores the mapping from the Transaction IDs (UUID) of recently +// executed transactions to their corresponding Transaction Fingerprint ID (uint64). +// The size of Cache is controlled via sql.contention.txn_id_cache.max_size +// cluster setting, and it follows FIFO eviction policy once the cache size +// reaches the limit defined by the cluster setting. +// +// Cache's overall architecture is as follows: +// +------------------------------------------------------------+ +// | connExecutor --------* | +// | | writes resolvedTxnID to Writer | +// | v | +// | +---------------------------------------------------+ | +// | | Writer | | +// | | | | +// | | Writer contains multiple shards of concurrent | | +// | | write buffer. Each incoming resolvedTxnID is | | +// | | first hashed to a corresponding shard, and then | | +// | | is written to the concurrent write buffer | | +// | | backing that shard. Once the concurrent write | | +// | | buffer is full, a flush is performed and the | | +// | | content of the buffer is send into the channel. | | +// | | | | +// | | +------------+ | | +// | | | shard1 | | | +// | | +------------+ | | +// | | | shard2 | | | +// | | +------------+ | | +// | | | shard3 | | | +// | | +------------+ | | +// | | | ..... | | | +// | | | ..... | | | +// | | +------------+ | | +// | | | shard128 | | | +// | | +------------+ | | +// | | | | +// | +-----+---------------------------------------------+ | +// +------------|-----------------------------------------------+ +// | +// | +// V +// channel +// ^ +// | +// Cache polls the channel using a goroutine and push the +// | messageBlock into its storage. +// | +// +----------------------------------+ +// | Cache: | +// | The cache contains a | +// | FIFO buffer backed by | +// | cache.UnorderedCache | +// +----------------------------------+ +type Cache struct { + st *cluster.Settings + + msgChan chan *messageBlock + closeCh chan struct{} + + mu struct { + syncutil.RWMutex + + store *cache.UnorderedCache + } + + messageBlockPool *sync.Pool + + writer Writer + + metrics *Metrics +} + +var ( + entrySize = int64(uuid.UUID{}.Size()) + + roachpb.TransactionFingerprintID(0).Size() +) + +// ResolvedTxnID represents a TxnID that is resolved to its corresponding +// TxnFingerprintID. +type ResolvedTxnID struct { + TxnID uuid.UUID + TxnFingerprintID roachpb.TransactionFingerprintID +} + +func (r *ResolvedTxnID) valid() bool { + return r.TxnID != uuid.UUID{} +} + +var ( + _ Reader = &Cache{} + _ Writer = &Cache{} + _ messageSink = &Cache{} +) + +// NewTxnIDCache creates a new instance of Cache. +func NewTxnIDCache(st *cluster.Settings, metrics *Metrics) *Cache { + t := &Cache{ + st: st, + metrics: metrics, + msgChan: make(chan *messageBlock, channelSize), + closeCh: make(chan struct{}), + } + + t.messageBlockPool = &sync.Pool{ + New: func() interface{} { + return &messageBlock{} + }, + } + + t.mu.store = cache.NewUnorderedCache(cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(size int, _, _ interface{}) bool { + return int64(size)*entrySize > MaxSize.Get(&st.SV) + }, + }) + + t.writer = newWriter(t, t.messageBlockPool) + return t +} + +// Start implements the Provider interface. +func (t *Cache) Start(ctx context.Context, stopper *stop.Stopper) { + addBlockToStore := func(msgBlock *messageBlock) { + t.mu.Lock() + defer t.mu.Unlock() + for blockIdx := range msgBlock { + if !msgBlock[blockIdx].valid() { + break + } + t.mu.store.Add(msgBlock[blockIdx].TxnID, msgBlock[blockIdx].TxnFingerprintID) + } + } + + consumeBlock := func(b *messageBlock) { + addBlockToStore(b) + *b = messageBlock{} + t.messageBlockPool.Put(b) + } + + err := stopper.RunAsyncTask(ctx, "txn-id-cache-ingest", func(ctx context.Context) { + for { + select { + case msgBlock := <-t.msgChan: + consumeBlock(msgBlock) + case <-stopper.ShouldQuiesce(): + close(t.closeCh) + return + } + } + }) + if err != nil { + close(t.closeCh) + } +} + +// Lookup implements the Reader interface. +func (t *Cache) Lookup(txnID uuid.UUID) (result roachpb.TransactionFingerprintID, found bool) { + t.metrics.CacheReadCounter.Inc(1) + + t.mu.RLock() + defer t.mu.RUnlock() + + txnFingerprintID, found := t.mu.store.Get(txnID) + if !found { + t.metrics.CacheMissCounter.Inc(1) + return roachpb.InvalidTransactionFingerprintID, found + } + + return txnFingerprintID.(roachpb.TransactionFingerprintID), found +} + +// Record implements the Writer interface. +func (t *Cache) Record(resolvedTxnID ResolvedTxnID) { + t.writer.Record(resolvedTxnID) +} + +// push implements the messageSink interface. +func (t *Cache) push(msg *messageBlock) { + select { + case t.msgChan <- msg: + case <-t.closeCh: + } +} + +// Flush flushes the resolved txn IDs in the Writer into the Cache. +func (t *Cache) Flush() { + t.writer.Flush() +} + +// Size return the current size of the Cache. +func (t *Cache) Size() int64 { + t.mu.RLock() + defer t.mu.RUnlock() + return int64(t.mu.store.Len()) * entrySize +} diff --git a/pkg/sql/contention/txnidcache/txn_id_cache_test.go b/pkg/sql/contention/txnidcache/txn_id_cache_test.go new file mode 100644 index 000000000000..4e70c0a5ee9f --- /dev/null +++ b/pkg/sql/contention/txnidcache/txn_id_cache_test.go @@ -0,0 +1,282 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache_test + +import ( + "context" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestTransactionIDCache tests the correctness of the txnidcache.Cache. +func TestTransactionIDCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + + appName := "txnIDCacheTest" + expectedTxnIDToUUIDMapping := make(map[uuid.UUID]roachpb.TransactionFingerprintID) + injector := runtimeHookInjector{} + + injector.setHook(func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) { + if strings.Contains(sessionData.ApplicationName, appName) { + expectedTxnIDToUUIDMapping[txnID] = txnFingerprintID + } + }) + + params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{ + BeforeTxnStatsRecorded: injector.hook, + } + + testServer, sqlConn, kvDB := serverutils.StartServer(t, params) + defer func() { + require.NoError(t, sqlConn.Close()) + testServer.Stopper().Stop(ctx) + }() + + testConn := sqlutils.MakeSQLRunner(sqlConn) + + // Set the cache size limit to a very generous amount to prevent premature + // eviction. + testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '1GB'") + + testConn.Exec(t, "CREATE DATABASE txnIDTest") + testConn.Exec(t, "USE txnIDTest") + testConn.Exec(t, "CREATE TABLE t AS SELECT generate_series(1, 10)") + testConn.Exec(t, "SET application_name = $1", appName) + + testCases := []struct { + stmts []string + explicit bool + }{ + // Implicit transactions that will have same statement fingerprint. + { + stmts: []string{"SELECT 1"}, + explicit: false, + }, + { + stmts: []string{"SELECT 2"}, + explicit: false, + }, + + // Implicit transaction that have different statement fingerprints. + { + stmts: []string{"SELECT 1, 1"}, + explicit: false, + }, + { + stmts: []string{"SELECT 1, 1, 2"}, + explicit: false, + }, + + // Explicit Transactions. + { + stmts: []string{"SELECT 1"}, + explicit: true, + }, + { + stmts: []string{"SELECT 5"}, + explicit: true, + }, + { + stmts: []string{"SELECT 5", "SELECT 6, 7"}, + explicit: true, + }, + } + + // Send test statements into both regular SQL connection and internal + // executor to test both code paths. + for _, tc := range testCases { + if tc.explicit { + testConn.Exec(t, "BEGIN") + } + { + for _, stmt := range tc.stmts { + testConn.Exec(t, stmt) + } + } + if tc.explicit { + testConn.Exec(t, "COMMIT") + } + } + + ie := testServer.InternalExecutor().(*sql.InternalExecutor) + + for _, tc := range testCases { + // Send statements one by one since internal executor doesn't support + // sending a batch of statements. + for _, stmt := range tc.stmts { + var txn *kv.Txn + if tc.explicit { + txn = kvDB.NewTxn(ctx, "") + } + _, err := ie.QueryRowEx( + ctx, + appName, + txn, + sessiondata.InternalExecutorOverride{ + User: security.RootUserName(), + }, + stmt, + ) + require.NoError(t, err) + if tc.explicit { + require.NoError(t, txn.Commit(ctx)) + } + + require.NoError(t, err) + } + } + + // Ensure we have intercepted all transactions, the expected size is + // calculated as: + // # stmt executed in regular executor + // + # stmt executed in internal executor + // + 1 (`SET application_name` executed previously in regular SQL Conn) + // - 3 (explicit txns executed in internal executor due to + // https://github.com/cockroachdb/cockroach/issues/73091) + expectedTxnIDCacheSize := len(testCases)*2 + 1 - 3 + require.Equal(t, expectedTxnIDCacheSize, len(expectedTxnIDToUUIDMapping)) + + sqlServer := testServer.SQLServer().(*sql.Server) + txnIDCache := sqlServer.GetTxnIDCache() + + txnIDCache.Flush() + t.Run("resolved_txn_id_cache_record", func(t *testing.T) { + testutils.SucceedsWithin(t, func() error { + for txnID, expectedTxnFingerprintID := range expectedTxnIDToUUIDMapping { + actualTxnFingerprintID, ok := txnIDCache.Lookup(txnID) + if !ok { + return errors.Newf("expected to find txn(%s) with fingerprintID: "+ + "%d, but it was not found.", + txnID, expectedTxnFingerprintID, + ) + } + if expectedTxnFingerprintID != actualTxnFingerprintID { + return errors.Newf("expected to find txn(%s) with fingerprintID: %d, but the actual fingerprintID is: %d", txnID, expectedTxnFingerprintID, actualTxnFingerprintID) + } + } + return nil + }, 3*time.Second) + + sizePreEviction := txnIDCache.Size() + testConn.Exec(t, "SET CLUSTER SETTING sql.contention.txn_id_cache.max_size = '10B'") + + // Execute additional queries to ensure we are overflowing the size limit. + testConn.Exec(t, "SELECT 1") + txnIDCache.Flush() + + testutils.SucceedsWithin(t, func() error { + sizePostEviction := txnIDCache.Size() + if sizePostEviction >= sizePreEviction { + return errors.Newf("expected txn id cache size to shrink below %d, "+ + "but it has increased to %d", sizePreEviction, sizePostEviction) + } + return nil + }, 3*time.Second) + }) + + t.Run("provisional_txn_id_cache_record", func(t *testing.T) { + testConn.Exec(t, "RESET CLUSTER SETTING sql.contention.txn_id_cache.max_size") + callCaptured := uint32(0) + + injector.setHook(func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID) { + if strings.Contains(sessionData.ApplicationName, appName) { + if txnFingerprintID != roachpb.InvalidTransactionFingerprintID { + txnIDCache.Flush() + + testutils.SucceedsWithin(t, func() error { + existingTxnFingerprintID, ok := txnIDCache.Lookup(txnID) + if !ok { + return errors.Newf("expected provision txn fingerprint id to be found for "+ + "txn(%s), but it was not", txnID) + } + if existingTxnFingerprintID != roachpb.InvalidTransactionFingerprintID { + return errors.Newf("expected txn (%s) to have a provisional"+ + "txn fingerprint id, but this txn already has a resolved "+ + "txn fingerprint id: %d", txnID, existingTxnFingerprintID) + } + return nil + }, 3*time.Second) + atomic.StoreUint32(&callCaptured, 1) + } + } + }) + + testConn.Exec(t, "BEGIN") + testConn.Exec(t, "SELECT 1") + testConn.Exec(t, "COMMIT") + + require.NotZerof(t, atomic.LoadUint32(&callCaptured), + "expected to found provisional txn id cache record, "+ + "but it was not found") + }) +} + +// runtimeHookInjector provides a way to dynamically inject a testing knobs +// into a running cluster. +type runtimeHookInjector struct { + syncutil.RWMutex + op func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) +} + +func (s *runtimeHookInjector) hook( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, +) { + s.RLock() + defer s.RUnlock() + s.op(sessionData, txnID, txnFingerprintID) +} + +func (s *runtimeHookInjector) setHook( + op func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ), +) { + s.Lock() + defer s.Unlock() + s.op = op +} diff --git a/pkg/sql/contention/txnidcache/writer.go b/pkg/sql/contention/txnidcache/writer.go new file mode 100644 index 000000000000..f86bc6bd3916 --- /dev/null +++ b/pkg/sql/contention/txnidcache/writer.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package txnidcache + +import ( + "sync" + + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// There is no strong reason why shardCount is 16 beyond that Java's +// ConcurrentHashMap also uses 16 shards and has reasonably good performance. +const shardCount = 16 + +type writer struct { + shards [shardCount]*concurrentWriteBuffer + + sink messageSink + msgBlockPool *sync.Pool +} + +var _ Writer = &writer{} + +func newWriter(sink messageSink, msgBlockPool *sync.Pool) *writer { + w := &writer{ + sink: sink, + msgBlockPool: msgBlockPool, + } + + for shardIdx := 0; shardIdx < shardCount; shardIdx++ { + w.shards[shardIdx] = newConcurrentWriteBuffer(sink, msgBlockPool) + } + + return w +} + +// Record implements the Writer interface. +func (w *writer) Record(resolvedTxnID ResolvedTxnID) { + shardIdx := hashTxnID(resolvedTxnID.TxnID) + buffer := w.shards[shardIdx] + buffer.Record(resolvedTxnID) +} + +// Flush implements the Writer interface. +func (w *writer) Flush() { + for shardIdx := 0; shardIdx < shardCount; shardIdx++ { + w.shards[shardIdx].Flush() + } +} + +func hashTxnID(txnID uuid.UUID) int { + b := txnID.GetBytes() + _, val, err := encoding.DecodeUint64Descending(b) + if err != nil { + panic(err) + } + return int(val % shardCount) +} diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index c6fe3d88ab8f..aa431afcce2e 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1408,6 +1408,14 @@ type ExecutorTestingKnobs struct { // OnTxnRetry, if set, will be called if there is a transaction retry. OnTxnRetry func(autoRetryReason error, evalCtx *tree.EvalContext) + + // BeforeTxnStatsRecorded, if set, will be called before the statistics + // of a transaction is being recorded. + BeforeTxnStatsRecorded func( + sessionData *sessiondata.SessionData, + txnID uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 84b6e6e06213..0caa909af8cd 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -379,6 +379,7 @@ func (s *Server) Metrics() (res []interface{}) { &s.SQLServer.InternalMetrics.EngineMetrics, &s.SQLServer.InternalMetrics.GuardrailMetrics, &s.SQLServer.ServerMetrics.StatsMetrics, + &s.SQLServer.ServerMetrics.ContentionSubsystemMetrics, } } diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index a83bc8c924a9..61033952fb6b 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "go.opentelemetry.io/otel/attribute" ) @@ -131,7 +132,8 @@ const ( ) // resetForNewSQLTxn (re)initializes the txnState for a new transaction. -// It creates a new client.Txn and initializes it using the session defaults. +// It creates a new client.Txn and initializes it using the session defaults +// and returns the ID of the new transaction. // // connCtx: The context in which the new transaction is started (usually a // connection's context). ts.Ctx will be set to a child context and should be @@ -156,7 +158,7 @@ func (ts *txnState) resetForNewSQLTxn( readOnly tree.ReadWriteMode, txn *kv.Txn, tranCtx transitionCtx, -) { +) (txnID uuid.UUID) { // Reset state vars to defaults. ts.sqlTimestamp = sqlTimestamp ts.isHistorical = false @@ -204,6 +206,7 @@ func (ts *txnState) resetForNewSQLTxn( } ts.mu.txn = txn } + txnID = ts.mu.txn.ID() sp.SetTag("txn", attribute.StringValue(ts.mu.txn.ID().String())) ts.mu.txnStart = timeutil.Now() ts.mu.Unlock() @@ -215,12 +218,15 @@ func (ts *txnState) resetForNewSQLTxn( if err := ts.setReadOnlyMode(readOnly); err != nil { panic(err) } + + return txnID } // finishSQLTxn finalizes a transaction's results and closes the root span for // the current SQL txn. This needs to be called before resetForNewSQLTxn() is -// called for starting another SQL txn. -func (ts *txnState) finishSQLTxn() { +// called for starting another SQL txn. The ID of the finalized transaction is +// returned. +func (ts *txnState) finishSQLTxn() (txnID uuid.UUID) { ts.mon.Stop(ts.Ctx) if ts.cancel != nil { ts.cancel() @@ -238,10 +244,12 @@ func (ts *txnState) finishSQLTxn() { sp.Finish() ts.Ctx = nil ts.mu.Lock() + txnID = ts.mu.txn.ID() ts.mu.txn = nil ts.mu.txnStart = time.Time{} ts.mu.Unlock() ts.recordingThreshold = 0 + return txnID } // finishExternalTxn is a stripped-down version of finishSQLTxn used by @@ -344,15 +352,26 @@ const ( ) // txnEvent is part of advanceInfo, informing the connExecutor about some -// transaction events. It is used by the connExecutor to clear state associated -// with a SQL transaction (other than the state encapsulated in TxnState; e.g. -// schema changes and portals). -// -//go:generate stringer -type=txnEvent -type txnEvent int +// transaction events. +type txnEvent struct { + // eventType is used by the connExecutor to clear state associated + // with a SQL transaction (other than the state encapsulated in TxnState; e.g. + // schema changes and portals). + eventType txnEventType + + // txnID is filled when a transaction starts, commits or aborts. + // When a transaction starts, txnID is set to the ID of the transaction that + // was created. + // When a transaction commits or aborts, txnID is set to the ID of the + // transaction that just finished execution. + txnID uuid.UUID +} + +//go:generate stringer -type=txnEventType +type txnEventType int const ( - noEvent txnEvent = iota + noEvent txnEventType = iota // txnStart means that the statement that just ran started a new transaction. // Note that when a transaction is restarted, txnStart event is not emitted. diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index 77c4992895ef..b7d810db0f80 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" ) @@ -142,7 +143,9 @@ func (tc *testContext) createNoTxnState() (fsm.State, *txnState) { // checkAdv returns an error if adv does not match all the expected fields. // // Pass noRewindExpected for expRewPos if a rewind is not expected. -func checkAdv(adv advanceInfo, expCode advanceCode, expRewPos CmdPos, expEv txnEvent) error { +func checkAdv( + adv advanceInfo, expCode advanceCode, expRewPos CmdPos, expEv txnEventType, expTxnID uuid.UUID, +) error { if adv.code != expCode { return errors.Errorf("expected code: %s, but got: %s (%+v)", expCode, adv.code, adv) } @@ -155,8 +158,25 @@ func checkAdv(adv advanceInfo, expCode advanceCode, expRewPos CmdPos, expEv txnE return errors.Errorf("expected rewind to %d, but got: %+v", expRewPos, adv) } } - if expEv != adv.txnEvent { - return errors.Errorf("expected txnEvent: %s, got: %s", expEv, adv.txnEvent) + if expEv != adv.txnEvent.eventType { + return errors.Errorf("expected txnEventType: %s, got: %s", expEv, adv.txnEvent.eventType) + } + + // The txnID field in advanceInfo is only checked when either: + // * txnStart: the txnID in this case should be not be empty. + // * txnCommit, txnRollback: the txnID in this case should equal to the + // transaction that just finished execution. + switch expEv { + case txnStart: + if adv.txnEvent.txnID.Equal(emptyTxnID) { + return errors.Errorf( + "expected txnID not to be empty, but it is") + } + case txnCommit, txnRollback: + if adv.txnEvent.txnID != expTxnID { + return errors.Errorf( + "expected txnID: %s, got: %s", expTxnID, adv.txnEvent.txnID) + } } return nil } @@ -223,7 +243,7 @@ func TestTransitions(t *testing.T) { type expAdvance struct { expCode advanceCode - expEv txnEvent + expEv txnEventType } txnName := sqlTxnName @@ -236,7 +256,9 @@ func TestTransitions(t *testing.T) { // A function used to init the txnState to the desired state before the // transition. The returned State and txnState are to be used to initialize // a Machine. - init func() (fsm.State, *txnState, error) + // If the initialized txnState contains an active transaction, its + // transaction ID is also returned. Else, emptyTxnID is returned. + init func() (fsm.State, *txnState, uuid.UUID, error) // The event to deliver to the state machine. ev fsm.Event @@ -263,9 +285,9 @@ func TestTransitions(t *testing.T) { { // Start an implicit txn from NoTxn. name: "NoTxn->Starting (implicit txn)", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createNoTxnState() - return s, ts, nil + return s, ts, emptyTxnID, nil }, ev: eventTxnStart{ImplicitTxn: fsm.True}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), @@ -288,9 +310,9 @@ func TestTransitions(t *testing.T) { { // Start an explicit txn from NoTxn. name: "NoTxn->Starting (explicit txn)", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createNoTxnState() - return s, ts, nil + return s, ts, emptyTxnID, nil }, ev: eventTxnStart{ImplicitTxn: fsm.False}, evPayload: makeEventTxnStartPayload(pri, tree.ReadWrite, timeutil.Now(), @@ -314,14 +336,14 @@ func TestTransitions(t *testing.T) { { // Finish an implicit txn. name: "Open (implicit) -> NoTxn", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(implicitTxn) // We commit the KV transaction, as that's done by the layer below // txnState. if err := ts.mu.txn.Commit(ts.Ctx); err != nil { - return nil, nil, err + return nil, nil, emptyTxnID, err } - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnFinishCommitted{}, evPayload: nil, @@ -335,14 +357,14 @@ func TestTransitions(t *testing.T) { { // Finish an explicit txn. name: "Open (explicit) -> NoTxn", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) // We commit the KV transaction, as that's done by the layer below // txnState. if err := ts.mu.txn.Commit(ts.Ctx); err != nil { - return nil, nil, err + return nil, nil, emptyTxnID, err } - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnFinishCommitted{}, evPayload: nil, @@ -356,9 +378,9 @@ func TestTransitions(t *testing.T) { { // Get a retriable error while we can auto-retry. name: "Open + auto-retry", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { b := eventRetriableErrPayload{ @@ -380,9 +402,9 @@ func TestTransitions(t *testing.T) { // except this time the error is on a COMMIT. This shouldn't make any // difference; we should still auto-retry like the above. name: "Open + auto-retry (COMMIT)", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { b := eventRetriableErrPayload{ @@ -403,9 +425,9 @@ func TestTransitions(t *testing.T) { // Get a retriable error when we can no longer auto-retry, but the client // is doing client-side retries. name: "Open + client retry", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { b := eventRetriableErrPayload{ @@ -430,9 +452,9 @@ func TestTransitions(t *testing.T) { // done a RELEASE such that COMMIT couldn't get retriable errors), and so // we can't go to RestartWait. name: "Open + client retry + error on COMMIT", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { b := eventRetriableErrPayload{ @@ -452,9 +474,9 @@ func TestTransitions(t *testing.T) { { // An error on COMMIT leaves us in NoTxn, not in Aborted. name: "Open + non-retriable error on COMMIT", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventNonRetriableErr{IsCommit: fsm.True}, evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, @@ -470,9 +492,9 @@ func TestTransitions(t *testing.T) { // Like the above, but this time with an implicit txn: we get a retriable // error, but we can't auto-retry. We expect to go to NoTxn. name: "Open + useless retriable error (implicit)", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(implicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, evFun: func(ts *txnState) (fsm.Event, fsm.EventPayload) { b := eventRetriableErrPayload{ @@ -492,9 +514,9 @@ func TestTransitions(t *testing.T) { { // We get a non-retriable error. name: "Open + non-retriable error", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventNonRetriableErr{IsCommit: fsm.False}, evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, @@ -508,11 +530,11 @@ func TestTransitions(t *testing.T) { { // We go to CommitWait (after a RELEASE SAVEPOINT). name: "Open->CommitWait", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) // Simulate what execution does before generating this event. err := ts.mu.txn.Commit(ts.Ctx) - return s, ts, err + return s, ts, ts.mu.txn.ID(), err }, ev: eventTxnReleased{}, expState: stateCommitWait{}, @@ -525,9 +547,9 @@ func TestTransitions(t *testing.T) { { // Restarting from Open via ROLLBACK TO SAVEPOINT. name: "Open + restart", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createOpenState(explicitTxn) - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, expState: stateOpen{ImplicitTxn: fsm.False}, @@ -545,9 +567,9 @@ func TestTransitions(t *testing.T) { { // The txn finished, such as after a ROLLBACK. name: "Aborted->NoTxn", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createAbortedState() - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnFinishAborted{}, expState: stateNoTxn{}, @@ -560,9 +582,9 @@ func TestTransitions(t *testing.T) { { // The txn is starting again (ROLLBACK TO SAVEPOINT while in Aborted). name: "Aborted->Open", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createAbortedState() - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventSavepointRollback{}, expState: stateOpen{ImplicitTxn: fsm.False}, @@ -575,9 +597,9 @@ func TestTransitions(t *testing.T) { { // The txn is starting again (ROLLBACK TO SAVEPOINT cockroach_restart while in Aborted). name: "Aborted->Restart", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createAbortedState() - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, expState: stateOpen{ImplicitTxn: fsm.False}, @@ -597,9 +619,9 @@ func TestTransitions(t *testing.T) { // Verify that the historical timestamp from the evPayload is propagated // to the expTxn. name: "Aborted->Starting (historical)", - init: func() (fsm.State, *txnState, error) { + init: func() (fsm.State, *txnState, uuid.UUID, error) { s, ts := testCon.createAbortedState() - return s, ts, nil + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnRestart{}, expState: stateOpen{ImplicitTxn: fsm.False}, @@ -616,8 +638,12 @@ func TestTransitions(t *testing.T) { // { name: "CommitWait->NoTxn", - init: func() (fsm.State, *txnState, error) { - return testCon.createCommitWaitState() + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts, err := testCon.createCommitWaitState() + if err != nil { + return nil, nil, emptyTxnID, err + } + return s, ts, ts.mu.txn.ID(), nil }, ev: eventTxnFinishCommitted{}, expState: stateNoTxn{}, @@ -629,8 +655,12 @@ func TestTransitions(t *testing.T) { }, { name: "CommitWait + err", - init: func() (fsm.State, *txnState, error) { - return testCon.createCommitWaitState() + init: func() (fsm.State, *txnState, uuid.UUID, error) { + s, ts, err := testCon.createCommitWaitState() + if err != nil { + return nil, nil, emptyTxnID, err + } + return s, ts, ts.mu.txn.ID(), nil }, ev: eventNonRetriableErr{IsCommit: fsm.False}, evPayload: eventNonRetriableErrPayload{err: fmt.Errorf("test non-retriable err")}, @@ -645,7 +675,7 @@ func TestTransitions(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { // Get the initial state. - s, ts, err := tc.init() + s, ts, expectedTxnID, err := tc.init() if err != nil { t.Fatal(err) } @@ -673,7 +703,7 @@ func TestTransitions(t *testing.T) { expRewPos = dummyRewCap.rewindPos } if err := checkAdv( - adv, tc.expAdv.expCode, expRewPos, tc.expAdv.expEv, + adv, tc.expAdv.expCode, expRewPos, tc.expAdv.expEv, expectedTxnID, ); err != nil { t.Fatal(err) } diff --git a/pkg/sql/txnevent_string.go b/pkg/sql/txnevent_string.go deleted file mode 100644 index 1590c65df04c..000000000000 --- a/pkg/sql/txnevent_string.go +++ /dev/null @@ -1,27 +0,0 @@ -// Code generated by "stringer"; DO NOT EDIT. - -package sql - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[noEvent-0] - _ = x[txnStart-1] - _ = x[txnCommit-2] - _ = x[txnRollback-3] - _ = x[txnRestart-4] -} - -const _txnEvent_name = "noEventtxnStarttxnCommittxnRollbacktxnRestart" - -var _txnEvent_index = [...]uint8{0, 7, 15, 24, 35, 45} - -func (i txnEvent) String() string { - if i < 0 || i >= txnEvent(len(_txnEvent_index)-1) { - return "txnEvent(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _txnEvent_name[_txnEvent_index[i]:_txnEvent_index[i+1]] -} diff --git a/pkg/sql/txneventtype_string.go b/pkg/sql/txneventtype_string.go new file mode 100644 index 000000000000..91e7607ca6e7 --- /dev/null +++ b/pkg/sql/txneventtype_string.go @@ -0,0 +1,27 @@ +// Code generated by "stringer"; DO NOT EDIT. + +package sql + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[noEvent-0] + _ = x[txnStart-1] + _ = x[txnCommit-2] + _ = x[txnRollback-3] + _ = x[txnRestart-4] +} + +const _txnEventType_name = "noEventtxnStarttxnCommittxnRollbacktxnRestart" + +var _txnEventType_index = [...]uint8{0, 7, 15, 24, 35, 45} + +func (i txnEventType) String() string { + if i < 0 || i >= txnEventType(len(_txnEventType_index)-1) { + return "txnEventType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _txnEventType_name[_txnEventType_index[i]:_txnEventType_index[i+1]] +} diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index ee6f68183af9..f8ed23dec07d 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1880,6 +1880,19 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{SQLLayer, "Contention"}}, + Charts: []chartDescription{ + { + Title: "Transaction ID Cache Miss", + Metrics: []string{"sql.contention.txn_id_cache.miss"}, + }, + { + Title: "Transaction ID Cache Read", + Metrics: []string{"sql.contention.txn_id_cache.read"}, + }, + }, + }, { Organization: [][]string{{SQLLayer, "Bulk"}}, Charts: []chartDescription{ From a545cdf02fd857f38e9bc6f2b38b2a173fae181c Mon Sep 17 00:00:00 2001 From: Azhng Date: Mon, 31 Jan 2022 18:42:43 +0000 Subject: [PATCH 2/2] sql: rename recordTransaction to recordTransactionFinish Previously, we had a pair of functions called recordTransactionStart() and recordTransaction(). These two functions were called at the beginning and the end of the a transaction. This commit renamed recordTransaction() to recordTransactionFinish() to reflect the time in which this function is called, and also moved this function right next to recordTransactionStart(). Release note: None --- pkg/sql/conn_executor_exec.go | 98 +++++++++++++++++------------------ 1 file changed, 49 insertions(+), 49 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index b4442a62683d..191846f373cc 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1882,53 +1882,6 @@ func payloadHasError(payload fsm.EventPayload) bool { return hasErr } -// recordTransactionStart records the start of the transaction. -func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { - // Transaction fingerprint ID will be available once transaction finishes - // execution. - ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ - TxnID: txnID, - TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, - }) - - ex.state.mu.RLock() - txnStart := ex.state.mu.txnStart - ex.state.mu.RUnlock() - implicit := ex.implicitTxn() - - // Transaction received time is the time at which the statement that prompted - // the creation of this transaction was received. - ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, - ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived)) - ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now()) - ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, - ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction)) - ex.extraTxnState.transactionStatementsHash = util.MakeFNV64() - ex.extraTxnState.transactionStatementFingerprintIDs = nil - ex.extraTxnState.numRows = 0 - ex.extraTxnState.shouldCollectTxnExecutionStats = false - ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{} - ex.extraTxnState.rowsRead = 0 - ex.extraTxnState.bytesRead = 0 - ex.extraTxnState.rowsWritten = 0 - ex.extraTxnState.rowsWrittenLogged = false - ex.extraTxnState.rowsReadLogged = false - if txnExecStatsSampleRate := collectTxnStatsSampleRate.Get(&ex.server.GetExecutorConfig().Settings.SV); txnExecStatsSampleRate > 0 { - ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64() - } - - ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1) - - ex.extraTxnState.shouldExecuteOnTxnFinish = true - ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart - ex.extraTxnState.txnFinishClosure.implicit = implicit - ex.extraTxnState.shouldExecuteOnTxnRestart = true - - if !implicit { - ex.statsCollector.StartExplicitTransaction() - } -} - func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { if ex.extraTxnState.shouldExecuteOnTxnFinish { ex.extraTxnState.shouldExecuteOnTxnFinish = false @@ -1950,7 +1903,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } - err := ex.recordTransaction(ctx, transactionFingerprintID, ev, implicit, txnStart) + err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) @@ -1979,7 +1932,54 @@ func (ex *connExecutor) onTxnRestart() { } } -func (ex *connExecutor) recordTransaction( +// recordTransactionStart records the start of the transaction. +func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { + // Transaction fingerprint ID will be available once transaction finishes + // execution. + ex.txnIDCacheWriter.Record(txnidcache.ResolvedTxnID{ + TxnID: txnID, + TxnFingerprintID: roachpb.InvalidTransactionFingerprintID, + }) + + ex.state.mu.RLock() + txnStart := ex.state.mu.txnStart + ex.state.mu.RUnlock() + implicit := ex.implicitTxn() + + // Transaction received time is the time at which the statement that prompted + // the creation of this transaction was received. + ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, + ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionQueryReceived)) + ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction, timeutil.Now()) + ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, + ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionFirstStartExecTransaction)) + ex.extraTxnState.transactionStatementsHash = util.MakeFNV64() + ex.extraTxnState.transactionStatementFingerprintIDs = nil + ex.extraTxnState.numRows = 0 + ex.extraTxnState.shouldCollectTxnExecutionStats = false + ex.extraTxnState.accumulatedStats = execstats.QueryLevelStats{} + ex.extraTxnState.rowsRead = 0 + ex.extraTxnState.bytesRead = 0 + ex.extraTxnState.rowsWritten = 0 + ex.extraTxnState.rowsWrittenLogged = false + ex.extraTxnState.rowsReadLogged = false + if txnExecStatsSampleRate := collectTxnStatsSampleRate.Get(&ex.server.GetExecutorConfig().Settings.SV); txnExecStatsSampleRate > 0 { + ex.extraTxnState.shouldCollectTxnExecutionStats = txnExecStatsSampleRate > ex.rng.Float64() + } + + ex.metrics.EngineMetrics.SQLTxnsOpen.Inc(1) + + ex.extraTxnState.shouldExecuteOnTxnFinish = true + ex.extraTxnState.txnFinishClosure.txnStartTime = txnStart + ex.extraTxnState.txnFinishClosure.implicit = implicit + ex.extraTxnState.shouldExecuteOnTxnRestart = true + + if !implicit { + ex.statsCollector.StartExplicitTransaction() + } +} + +func (ex *connExecutor) recordTransactionFinish( ctx context.Context, transactionFingerprintID roachpb.TransactionFingerprintID, ev txnEvent,