diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index e318ab43dd71..b93c50b858c6 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2108,9 +2108,12 @@ Session represents one SQL session. | alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | Number of currently allocated bytes in the session memory monitor. | [reserved](#support-status) | | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Number of transactions that were executed so far on this session. | [reserved](#support-status) | +| txnFingerprintIDs | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| totalActiveTime | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | @@ -2242,9 +2245,12 @@ Session represents one SQL session. | alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | Number of currently allocated bytes in the session memory monitor. | [reserved](#support-status) | | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Number of transactions that were executed so far on this session. | [reserved](#support-status) | +| txnFingerprintIDs | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| totalActiveTime | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index ed9ad06da725..593b601d02c5 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -186,6 +186,7 @@ sql.ttl.default_range_concurrency integer 1 default amount of ranges to process sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job sql.ttl.job.enabled boolean true whether the TTL job is enabled sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_buffer.capacity integer 100 the maximum number of txn fingerprint IDs stored timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e444e83a3c6c..ebd50141da88 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -202,6 +202,7 @@ sql.ttl.default_select_batch_sizeinteger500default amount of rows to select in a single query during a TTL job sql.ttl.job.enabledbooleantruewhether the TTL job is enabled sql.ttl.range_batch_sizeinteger100amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_buffer.capacityinteger100the maximum number of txn fingerprint IDs stored timeseries.storage.enabledbooleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttlduration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttlduration2160h0m0sthe maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index 69ffddbb0389..506453483330 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -768,6 +768,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" }, + "Duration": { + "description": "A Duration represents the elapsed time between two instants\nas an int64 nanosecond count. The representation limits the\nlargest representable duration to approximately 290 years.", + "type": "integer", + "format": "int64", + "x-go-package": "time" + }, "EventsResponse": { "description": "EventsResponse contains a set of event log entries. This is always limited\nto the latest N entries (N is enforced in the associated endpoint).", "type": "object", @@ -1220,6 +1226,12 @@ "node_id": { "$ref": "#/definitions/NodeID" }, + "num_txns_executed": { + "description": "Number of transactions that were executed so far on this session.", + "type": "integer", + "format": "int32", + "x-go-name": "NumTxnsExecuted" + }, "start": { "description": "Timestamp of session's start.", "type": "string", @@ -1229,6 +1241,17 @@ "status": { "$ref": "#/definitions/Session_Status" }, + "totalActiveTime": { + "$ref": "#/definitions/Duration" + }, + "txnFingerprintIDs": { + "description": "List of transaction fingerprint IDs in this session.", + "type": "array", + "items": { + "$ref": "#/definitions/TransactionFingerprintID" + }, + "x-go-name": "TxnFingerprintIDs" + }, "username": { "description": "Username of the user for this session.", "type": "string", @@ -1493,6 +1516,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/util/hlc" }, + "TransactionFingerprintID": { + "description": "TransactionFingerprintID is the hashed string constructed using the\nindividual statement fingerprint IDs that comprise the transaction.", + "type": "integer", + "format": "uint64", + "x-go-package": "github.com/cockroachdb/cockroach/pkg/roachpb" + }, "TxnInfo": { "type": "object", "title": "TxnInfo represents an in flight user transaction on some Session.", diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index daeaa524be6b..e92efa21cc3e 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -243,15 +243,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index f6c5ab35c2d2..c23400eb9e57 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -944,6 +944,12 @@ message Session { // Information about the txn in progress on this session. Nil if the // session doesn't currently have a transaction. TxnInfo active_txn = 12; + // Number of transactions that were executed so far on this session. + int32 num_txns_executed = 16; + // List of transaction fingerprint IDs in this session. + repeated uint64 txnFingerprintIDs = 17 [(gogoproto.customname) = "TxnFingerprintIDs", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", + (gogoproto.nullable) = false]; // The SQL statement fingerprint of the last query executed on this session, // compatible with StatementStatisticsKey. string last_active_query_no_constants = 13; @@ -958,6 +964,9 @@ message Session { // Timestamp of session's end. google.protobuf.Timestamp end = 15 [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; + // The session's total active time. + google.protobuf.Duration totalActiveTime = 18 [(gogoproto.nullable) = false, + (gogoproto.stdduration) = true]; } // An error wrapper object for ListSessionsResponse. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 32f3fb60276e..1c0cf15b6800 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -223,6 +223,7 @@ go_library( "topk.go", "truncate.go", "txn_state.go", + "txn_fingerprint_id_buffer.go", "type_change.go", "unary.go", "union.go", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 8f6e937266c4..32e608adcb39 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -883,6 +883,8 @@ func (s *Server) newConnExecutor( stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, indexUsageStats: s.indexUsageStats, txnIDCacheWriter: s.txnIDCache, + totalActiveTimeStopWatch: timeutil.NewStopWatch(), + txnFingerprintIDBuffer: NewTxnFingerprintIDBuffer(s.cfg.Settings, s.cfg.RootMemoryMonitor), } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -1497,6 +1499,13 @@ type connExecutor struct { // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the // Transaction ID Cache. txnIDCacheWriter txnidcache.Writer + + // txnFingerprintIDBuffer is a circular buffer keeping track of the + // txnFingerprintIDs in this session. + txnFingerprintIDBuffer *TxnFingerprintIDBuffer + + // totalActiveTimeStopWatch tracks the total active time of the session. + totalActiveTimeStopWatch *timeutil.StopWatch } // ctxHolder contains a connection's context and, while session tracing is @@ -3115,6 +3124,8 @@ func (ex *connExecutor) serialize() serverpb.Session { remoteStr = sd.RemoteAddr.String() } + txnFingerprintIDs := ex.txnFingerprintIDBuffer.GetAllTxnFingerprintIDs() + return serverpb.Session{ Username: sd.SessionUser().Normalized(), ClientAddress: remoteStr, @@ -3122,12 +3133,15 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(), ActiveQueries: activeQueries, ActiveTxn: activeTxnInfo, + NumTxnsExecuted: int32(ex.extraTxnState.txnCounter), + TxnFingerprintIDs: txnFingerprintIDs, LastActiveQuery: lastActiveQuery, ID: ex.sessionID.GetBytes(), AllocBytes: ex.mon.AllocBytes(), MaxAllocBytes: ex.mon.MaximumBytes(), LastActiveQueryNoConstants: lastActiveQueryNoConstants, Status: status, + TotalActiveTime: ex.totalActiveTimeStopWatch.Elapsed(), } } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 536429c99817..0adda4d8af15 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2101,6 +2101,12 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now()) transactionFingerprintID := roachpb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum()) + err := ex.txnFingerprintIDBuffer.Enqueue(transactionFingerprintID) + if err != nil { + if log.V(1) { + log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err) + } + } if !implicit { ex.statsCollector.EndExplicitTransaction( ctx, @@ -2114,7 +2120,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } - err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) + err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) @@ -2126,6 +2132,8 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { func (ex *connExecutor) onTxnRestart(ctx context.Context) { if ex.extraTxnState.shouldExecuteOnTxnRestart { + ex.totalActiveTimeStopWatch.Stop() + defer ex.totalActiveTimeStopWatch.Start() ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionMostRecentStartExecTransaction, timeutil.Now()) ex.extraTxnState.transactionStatementFingerprintIDs = nil ex.extraTxnState.transactionStatementsHash = util.MakeFNV64() @@ -2157,6 +2165,8 @@ func (ex *connExecutor) recordTransactionStart(txnID uuid.UUID) { ex.state.mu.RUnlock() implicit := ex.implicitTxn() + ex.totalActiveTimeStopWatch.Start() + // Transaction received time is the time at which the statement that prompted // the creation of this transaction was received. ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionTransactionReceived, @@ -2210,6 +2220,7 @@ func (ex *connExecutor) recordTransactionFinish( txnEnd := timeutil.Now() txnTime := txnEnd.Sub(txnStart) + ex.totalActiveTimeStopWatch.Stop() if ex.executorType != executorTypeInternal { ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 53809d47f5d4..b601f2845ea5 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1615,7 +1615,7 @@ CREATE TABLE crdb_internal.%s ( node_id INT, -- the ID of the node running the transaction session_id STRING, -- the ID of the session start TIMESTAMP, -- the start time of the transaction - txn_string STRING, -- the string representation of the transcation + txn_string STRING, -- the string representation of the transaction application_name STRING, -- the name of the application as per SET application_name num_stmts INT, -- the number of statements executed so far num_retries INT, -- the number of times the transaction was restarted @@ -1894,6 +1894,7 @@ CREATE TABLE crdb_internal.%s ( application_name STRING, -- the name of the application as per SET application_name active_queries STRING, -- the currently running queries as SQL last_active_query STRING, -- the query that finished last on this session as SQL + num_txns_executed INT, -- the number of transactions that were executed so far on this session session_start TIMESTAMP, -- the time when the session was opened oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started kv_txn STRING, -- the ID of the current KV transaction @@ -1995,6 +1996,7 @@ func populateSessionsTable( tree.NewDString(session.ApplicationName), tree.NewDString(activeQueries.String()), tree.NewDString(session.LastActiveQuery), + tree.NewDInt(tree.DInt(session.NumTxnsExecuted)), startTSDatum, oldestStartDatum, kvTxnIDDatum, @@ -2020,6 +2022,7 @@ func populateSessionsTable( tree.DNull, // application name tree.NewDString("-- "+rpcErr.Message), // active queries tree.DNull, // last active query + tree.DNull, // num txns executed tree.DNull, // session start tree.DNull, // oldest_query_start tree.DNull, // kv_txn diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go index 9dc08aba57c7..638d0358550b 100644 --- a/pkg/sql/delegate/show_sessions.go +++ b/pkg/sql/delegate/show_sessions.go @@ -16,7 +16,7 @@ import ( ) func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) { - const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` + const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, num_txns_executed, session_start, oldest_query_start FROM crdb_internal.` table := `node_sessions` if n.Cluster { table = `cluster_sessions` diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 0f49f9c0c072..65ff7d342f19 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -364,15 +364,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 3c197f49016d..6c37ac511b1d 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -316,6 +316,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -331,6 +332,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -939,6 +941,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -954,6 +957,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, diff --git a/pkg/sql/testdata/txn_fingerprint_id_buffer b/pkg/sql/testdata/txn_fingerprint_id_buffer new file mode 100644 index 000000000000..918cf08f34e1 --- /dev/null +++ b/pkg/sql/testdata/txn_fingerprint_id_buffer @@ -0,0 +1,95 @@ +# Initialize TxnFingerprintIDBuffer +init capacity=10 +---- +buffer_size: 0 + +# Add four TxnFingerprintIDs: 5, 6, 7, 8 +enqueue id=05 +---- +buffer_size: 1 + +enqueue id=06 +---- +buffer_size: 2 + +enqueue id=07 +---- +buffer_size: 3 + +enqueue id=08 +---- +buffer_size: 4 + +dequeue +---- +txnFingerprintID: 5 + +dequeue +---- +txnFingerprintID: 6 + +# There should be 2 valid TxnFingerprintIDs +show +---- +0 -> 0 +1 -> 0 +2 -> 7 +3 -> 8 +4 -> 0 +5 -> 0 +6 -> 0 +7 -> 0 +8 -> 0 +9 -> 0 + +enqueue id=09 +---- +buffer_size: 3 + +getAllTxnFingerprintIDs +---- +[7 8 9] + +# Increase the TxnFingerprintIDBufferCapacity cluster setting +override capacity=12 +---- +TxnFingerprintIDBufferCapacity: 12 + +# Enqueue another txnFingerprintID so the override takes effect +enqueue id=10 +---- +buffer_size: 4 + +# The buffer should look like this now: +show +---- +0 -> 7 +1 -> 8 +2 -> 9 +3 -> 10 +4 -> 0 +5 -> 0 +6 -> 0 +7 -> 0 +8 -> 0 +9 -> 0 +10 -> 0 +11 -> 0 + +# Decrease the TxnFingerprintIDBufferCapacity cluster setting to one below the +# size of the buffer +override capacity=3 +---- +TxnFingerprintIDBufferCapacity: 3 + +# Enqueue another id to overwrite the 0th txnFingerprintID +enqueue id=11 +---- +buffer_size: 3 + +# The buffer should look like this now: +show +---- +0 -> 11 +1 -> 8 +2 -> 9 diff --git a/pkg/sql/txn_fingerprint_id_buffer.go b/pkg/sql/txn_fingerprint_id_buffer.go new file mode 100644 index 000000000000..0c54518b9e02 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_buffer.go @@ -0,0 +1,182 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// TxnFingerprintIDBufferCapacity is the cluster setting that controls the +// capacity of the txn fingerprint ID circular buffer. +var TxnFingerprintIDBufferCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.txn_fingerprint_id_buffer.capacity", + "the maximum number of txn fingerprint IDs stored", + 100, +).WithPublic() + +// TxnFingerprintIDBuffer is a thread-safe circular buffer tracking transaction +// fingerprint IDs at the session level. +type TxnFingerprintIDBuffer struct { + st *cluster.Settings + + mu struct { + syncutil.RWMutex + data []roachpb.TransactionFingerprintID + acc mon.BoundAccount + size int + capacity int64 + readPosition int + writePosition int + } + + mon *mon.BytesMonitor +} + +// NewTxnFingerprintIDBuffer returns a new TxnFingerprintIDBuffer. +func NewTxnFingerprintIDBuffer( + st *cluster.Settings, parentMon *mon.BytesMonitor, +) *TxnFingerprintIDBuffer { + b := &TxnFingerprintIDBuffer{st: st} + b.initializeBufferLocked(TxnFingerprintIDBufferCapacity.Get(&st.SV)) + + monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-buffer", 0 /* limit */, parentMon) + b.mu.acc = monitor.MakeBoundAccount() + b.mon = monitor + b.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + + return b +} + +// initializeBufferLocked initializes a new buffer with the given capacity. +func (b *TxnFingerprintIDBuffer) initializeBufferLocked(capacity int64) { + b.mu.capacity = capacity + b.mu.data = make([]roachpb.TransactionFingerprintID, capacity) + for i := range b.mu.data { + b.mu.data[i] = roachpb.InvalidTransactionFingerprintID + } +} + +// Enqueue adds a TxnFingerprintID to the circular buffer. +func (b *TxnFingerprintIDBuffer) Enqueue(value roachpb.TransactionFingerprintID) error { + b.mu.Lock() + defer b.mu.Unlock() + + b.checkForCapacityLocked() + if b.mu.size >= int(b.mu.capacity) { + b.dequeueLocked() + } + + if b.mu.writePosition >= int(b.mu.capacity) { + b.mu.writePosition = 0 + } + + size := value.Size() + err := b.mu.acc.Grow(context.Background(), size) + if err != nil { + return err + } + + b.mu.data[b.mu.writePosition] = value + b.mu.writePosition++ + b.mu.size++ + + return nil +} + +// checkForCapacityLocked checks if the TxnFingerprintIDBufferCapacity cluster +// setting has been updated. +func (b *TxnFingerprintIDBuffer) checkForCapacityLocked() { + capacityClusterSetting := TxnFingerprintIDBufferCapacity.Get(&b.st.SV) + if b.mu.capacity != capacityClusterSetting { + b.updateCapacityLocked(capacityClusterSetting) + } +} + +// updateCapacityLocked updates the capacity of the circular buffer and moves +// the data into a new slice with that capacity. +func (b *TxnFingerprintIDBuffer) updateCapacityLocked(newCapacity int64) { + newData := make([]roachpb.TransactionFingerprintID, newCapacity) + oldData := b.mu.data + + ptr := 0 + b.mu.size = 0 + b.initializeBufferLocked(newCapacity) + for _, txnFingerprintID := range oldData { + if txnFingerprintID != roachpb.InvalidTransactionFingerprintID { + if ptr >= int(newCapacity) { + break + } + newData[ptr] = txnFingerprintID + b.mu.size++ + ptr++ + } + } + + b.mu.data = newData + b.mu.readPosition = 0 + b.mu.writePosition = b.mu.size +} + +// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the +// circular buffer. +func (b *TxnFingerprintIDBuffer) GetAllTxnFingerprintIDs() []roachpb.TransactionFingerprintID { + b.mu.Lock() + defer b.mu.Unlock() + + var txnFingerprintIDs []roachpb.TransactionFingerprintID + if b.mu.data[b.mu.readPosition] != roachpb.InvalidTransactionFingerprintID { + txnFingerprintIDs = append(txnFingerprintIDs, b.mu.data[b.mu.readPosition]) + } + + ptr := b.mu.readPosition + 1 + for ptr != b.mu.readPosition { + if b.mu.data[ptr] != roachpb.InvalidTransactionFingerprintID { + txnFingerprintIDs = append(txnFingerprintIDs, b.mu.data[ptr]) + } + ptr = (ptr + 1) % int(b.mu.capacity) + } + + return txnFingerprintIDs +} + +// dequeue returns the oldest transaction fingerprint ID +func (b *TxnFingerprintIDBuffer) dequeue() roachpb.TransactionFingerprintID { + b.mu.Lock() + defer b.mu.Unlock() + + return b.dequeueLocked() +} + +func (b *TxnFingerprintIDBuffer) dequeueLocked() roachpb.TransactionFingerprintID { + txnFingerprintID := b.mu.data[b.mu.readPosition] + b.mu.data[b.mu.readPosition] = roachpb.InvalidTransactionFingerprintID + + size := txnFingerprintID.Size() + b.mu.acc.Shrink(context.Background(), size) + b.mu.size-- + b.mu.readPosition++ + + return txnFingerprintID +} + +func (b *TxnFingerprintIDBuffer) size() int { + b.mu.Lock() + defer b.mu.Unlock() + + return b.mu.size +} diff --git a/pkg/sql/txn_fingerprint_id_buffer_test.go b/pkg/sql/txn_fingerprint_id_buffer_test.go new file mode 100644 index 000000000000..98b03da3f8d4 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_buffer_test.go @@ -0,0 +1,104 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "math" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestTxnFingerprintIDBuffer(t *testing.T) { + defer leaktest.AfterTest(t)() + var txnFingerprintIDBuffer *TxnFingerprintIDBuffer + + datadriven.Walk(t, testutils.TestDataPath(t, "txn_fingerprint_id_buffer"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() + switch d.Cmd { + case "init": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + + st := &cluster.Settings{} + monitor := mon.NewUnlimitedMonitor( + ctx, + "test", + mon.MemoryResource, + nil, /* currCount */ + nil, /* maxHist */ + math.MaxInt64, + st, + ) + txnFingerprintIDBuffer = NewTxnFingerprintIDBuffer(st, monitor) + + TxnFingerprintIDBufferCapacity.Override(ctx, &st.SV, int64(capacity)) + + return fmt.Sprintf("buffer_size: %d", txnFingerprintIDBuffer.size()) + case "enqueue": + var idStr string + d.ScanArgs(t, "id", &idStr) + + id, err := strconv.ParseUint(idStr, 10, 64) + require.NoError(t, err) + txnFingerprintID := roachpb.TransactionFingerprintID(id) + + err = txnFingerprintIDBuffer.Enqueue(txnFingerprintID) + require.NoError(t, err) + + return fmt.Sprintf("buffer_size: %d", txnFingerprintIDBuffer.size()) + case "dequeue": + txnFingerprintID := txnFingerprintIDBuffer.dequeue() + return fmt.Sprintf("txnFingerprintID: %d", txnFingerprintID) + case "override": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + TxnFingerprintIDBufferCapacity.Override(ctx, &txnFingerprintIDBuffer.st.SV, int64(capacity)) + capacityClusterSetting := TxnFingerprintIDBufferCapacity.Get(&txnFingerprintIDBuffer.st.SV) + return fmt.Sprintf("TxnFingerprintIDBufferCapacity: %d", capacityClusterSetting) + case "show": + return printTxnFingerprintIDBuffer(txnFingerprintIDBuffer) + case "getAllTxnFingerprintIDs": + txnFingerprintIDs := txnFingerprintIDBuffer.GetAllTxnFingerprintIDs() + + return fmt.Sprintf("%d\n", txnFingerprintIDs) + } + return "" + + }) + }) +} + +func printTxnFingerprintIDBuffer(buffer *TxnFingerprintIDBuffer) string { + buffer.mu.Lock() + defer buffer.mu.Unlock() + + var result []string + for i, txnFingerprintID := range buffer.mu.data { + result = append(result, fmt.Sprintf("%d -> %d", i, txnFingerprintID)) + } + if len(result) == 0 { + return "empty" + } + + return strings.Join(result, "\n") +} diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index 9ab8b2746f8e..e16010eb3091 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -47,6 +47,8 @@ export const idleSession: SessionInfo = { alloc_bytes: Long.fromNumber(0), max_alloc_bytes: Long.fromNumber(10240), active_queries: [], + num_txns_executed: 1, + txnFingerprintIDs: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -85,6 +87,8 @@ export const idleTransactionSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", active_queries: [], + num_txns_executed: 1, + txnFingerprintIDs: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -138,6 +142,8 @@ export const activeSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", status: Status.ACTIVE, + num_txns_executed: 1, + txnFingerprintIDs: [], toJSON: () => ({}), }, }; @@ -163,6 +169,8 @@ export const closedSession: SessionInfo = { nanos: 369989000, }, status: Status.CLOSED, + num_txns_executed: 1, + txnFingerprintIDs: [], toJSON: () => ({}), }, };