diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 4cc058257314..780ff4d55f7c 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2111,6 +2111,9 @@ Session represents one SQL session. | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) | +| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | +| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | @@ -2247,6 +2250,9 @@ Session represents one SQL session. | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | | status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | | end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | +| num_txns_executed | [int32](#cockroach.server.serverpb.ListSessionsResponse-int32) | | Count of the number of transactions that have been opened on this session. This count includes transactions that are in progress. | [reserved](#support-status) | +| txn_fingerprint_ids | [uint64](#cockroach.server.serverpb.ListSessionsResponse-uint64) | repeated | List of transaction fingerprint IDs in this session. | [reserved](#support-status) | +| total_active_time | [google.protobuf.Duration](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Duration) | | The session's total active time. | [reserved](#support-status) | diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index b97a20e8cc75..96037c514428 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -273,6 +273,11 @@ sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TT sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job sql.ttl.job.enabled boolean true whether the TTL job is enabled +<<<<<<< HEAD +======= +sql.ttl.range_batch_size integer 100 amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored +>>>>>>> 4dd1f55533 (server, sql: surface session txnCount, txn fingerprints, active time) timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 10d1417e00b8..0f6f0af548e9 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -204,6 +204,11 @@ sql.ttl.default_range_concurrencyinteger1default amount of ranges to process at once during a TTL delete sql.ttl.default_select_batch_sizeinteger500default amount of rows to select in a single query during a TTL job sql.ttl.job.enabledbooleantruewhether the TTL job is enabled +<<<<<<< HEAD +======= +sql.ttl.range_batch_sizeinteger100amount of ranges to fetch at a time for a table during the TTL job +sql.txn_fingerprint_id_cache.capacityinteger100the maximum number of txn fingerprint IDs stored +>>>>>>> 4dd1f55533 (server, sql: surface session txnCount, txn fingerprints, active time) timeseries.storage.enabledbooleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere timeseries.storage.resolution_10s.ttlduration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. timeseries.storage.resolution_30m.ttlduration2160h0m0sthe maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index 84da400c3c76..a5eea69af292 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -773,6 +773,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" }, + "Duration": { + "description": "A Duration represents the elapsed time between two instants\nas an int64 nanosecond count. The representation limits the\nlargest representable duration to approximately 290 years.", + "type": "integer", + "format": "int64", + "x-go-package": "time" + }, "EventsResponse": { "description": "EventsResponse contains a set of event log entries. This is always limited\nto the latest N entries (N is enforced in the associated endpoint).", "type": "object", @@ -1225,6 +1231,12 @@ "node_id": { "$ref": "#/definitions/NodeID" }, + "num_txns_executed": { + "description": "Count of the number of transactions that have been opened on this session.\nThis count includes transactions that are in progress.", + "type": "integer", + "format": "int32", + "x-go-name": "NumTxnsExecuted" + }, "start": { "description": "Timestamp of session's start.", "type": "string", @@ -1234,6 +1246,17 @@ "status": { "$ref": "#/definitions/Session_Status" }, + "total_active_time": { + "$ref": "#/definitions/Duration" + }, + "txn_fingerprint_ids": { + "description": "List of transaction fingerprint IDs in this session.", + "type": "array", + "items": { + "$ref": "#/definitions/TransactionFingerprintID" + }, + "x-go-name": "TxnFingerprintIDs" + }, "username": { "description": "Username of the user for this session.", "type": "string", @@ -1498,6 +1521,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/util/hlc" }, + "TransactionFingerprintID": { + "description": "TransactionFingerprintID is the hashed string constructed using the\nindividual statement fingerprint IDs that comprise the transaction.", + "type": "integer", + "format": "uint64", + "x-go-package": "github.com/cockroachdb/cockroach/pkg/roachpb" + }, "TxnInfo": { "type": "object", "title": "TxnInfo represents an in flight user transaction on some Session.", diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index dfe0b113e969..ec7309b07d59 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -245,15 +245,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries last_auto_retry_reason -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 73c3d1c2c823..a054f5c2f1fb 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -949,9 +949,11 @@ message Session { // Information about the txn in progress on this session. Nil if the // session doesn't currently have a transaction. TxnInfo active_txn = 12; + // The SQL statement fingerprint of the last query executed on this session, // compatible with StatementStatisticsKey. string last_active_query_no_constants = 13; + // Enum for sessions status. enum Status { ACTIVE = 0; @@ -960,9 +962,23 @@ message Session { } // The session's status. Status status = 14; + // Timestamp of session's end. google.protobuf.Timestamp end = 15 [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; + + // Count of the number of transactions that have been opened on this session. + // This count includes transactions that are in progress. + int32 num_txns_executed = 16; + + // List of transaction fingerprint IDs in this session. + repeated uint64 txn_fingerprint_ids = 17 [(gogoproto.customname) = "TxnFingerprintIDs", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", + (gogoproto.nullable) = false]; + + // The session's total active time. + google.protobuf.Duration total_active_time = 18 [(gogoproto.nullable) = false, + (gogoproto.stdduration) = true]; } // An error wrapper object for ListSessionsResponse. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index dd593a2f2336..3eff0529147e 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -226,6 +226,7 @@ go_library( "testutils.go", "topk.go", "truncate.go", + "txn_fingerprint_id_cache.go", "txn_state.go", "type_change.go", "unary.go", @@ -585,6 +586,7 @@ go_test( "temporary_schema_test.go", "tenant_test.go", "trace_test.go", + "txn_fingerprint_id_cache_test.go", "txn_restart_test.go", "txn_state_test.go", "type_change_test.go", diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index a5c13e97649e..cb9e8bb0b6fb 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -884,6 +884,8 @@ func (s *Server) newConnExecutor( stmtDiagnosticsRecorder: s.cfg.StmtDiagnosticsRecorder, indexUsageStats: s.indexUsageStats, txnIDCacheWriter: s.txnIDCache, + totalActiveTimeStopWatch: timeutil.NewStopWatch(), + txnFingerprintIDCache: NewTxnFingerprintIDCache(s.cfg.Settings, sessionRootMon), } ex.state.txnAbortCount = ex.metrics.EngineMetrics.TxnAbortCount @@ -1489,6 +1491,14 @@ type connExecutor struct { // txnIDCacheWriter is used to write txnidcache.ResolvedTxnID to the // Transaction ID Cache. txnIDCacheWriter txnidcache.Writer + + // txnFingerprintIDCache is used to track the most recent + // txnFingerprintIDs executed in this session. + txnFingerprintIDCache *TxnFingerprintIDCache + + // totalActiveTimeStopWatch tracks the total active time of the session. + // This is defined as the time spent executing transactions and statements. + totalActiveTimeStopWatch *timeutil.StopWatch } // ctxHolder contains a connection's context and, while session tracing is @@ -2864,6 +2874,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Start of the transaction, so no statements were executed earlier. // Bump the txn counter for logging. ex.extraTxnState.txnCounter++ + + // Session is considered active when executing a transaction. + ex.totalActiveTimeStopWatch.Start() + if !ex.server.cfg.Codec.ForSystemTenant() { // Update the leased descriptor collection with the current sqlliveness.Session. // This is required in the multi-tenant environment to update the transaction @@ -3119,6 +3133,12 @@ func (ex *connExecutor) serialize() serverpb.Session { remoteStr = sd.RemoteAddr.String() } + txnFingerprintIDs := ex.txnFingerprintIDCache.GetAllTxnFingerprintIDs() + sessionActiveTime := ex.totalActiveTimeStopWatch.Elapsed() + if startedAt, started := ex.totalActiveTimeStopWatch.LastStartedAt(); started { + sessionActiveTime = time.Duration(sessionActiveTime.Nanoseconds() + timeutil.Since(startedAt).Nanoseconds()) + } + return serverpb.Session{ Username: sd.SessionUser().Normalized(), ClientAddress: remoteStr, @@ -3126,12 +3146,15 @@ func (ex *connExecutor) serialize() serverpb.Session { Start: ex.phaseTimes.GetSessionPhaseTime(sessionphase.SessionInit).UTC(), ActiveQueries: activeQueries, ActiveTxn: activeTxnInfo, + NumTxnsExecuted: int32(ex.extraTxnState.txnCounter), + TxnFingerprintIDs: txnFingerprintIDs, LastActiveQuery: lastActiveQuery, ID: ex.sessionID.GetBytes(), AllocBytes: ex.mon.AllocBytes(), MaxAllocBytes: ex.mon.MaximumBytes(), LastActiveQueryNoConstants: lastActiveQueryNoConstants, Status: status, + TotalActiveTime: sessionActiveTime, } } diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index be845b32310f..2d5f1b9d1271 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2110,6 +2110,14 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionEndExecTransaction, timeutil.Now()) transactionFingerprintID := roachpb.TransactionFingerprintID(ex.extraTxnState.transactionStatementsHash.Sum()) + + err := ex.txnFingerprintIDCache.Add(transactionFingerprintID) + if err != nil { + if log.V(1) { + log.Warningf(ctx, "failed to enqueue transactionFingerprintID = %d: %s", transactionFingerprintID, err) + } + } + if !implicit { ex.statsCollector.EndExplicitTransaction( ctx, @@ -2123,7 +2131,7 @@ func (ex *connExecutor) onTxnFinish(ctx context.Context, ev txnEvent) { transactionFingerprintID, ) } - err := ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) + err = ex.recordTransactionFinish(ctx, transactionFingerprintID, ev, implicit, txnStart) if err != nil { if log.V(1) { log.Warningf(ctx, "failed to record transaction stats: %s", err) @@ -2219,6 +2227,7 @@ func (ex *connExecutor) recordTransactionFinish( txnEnd := timeutil.Now() txnTime := txnEnd.Sub(txnStart) + ex.totalActiveTimeStopWatch.Stop() if ex.executorType != executorTypeInternal { ex.metrics.EngineMetrics.SQLTxnsOpen.Dec(1) } diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 568417f825a6..c7363de4ac55 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -1623,6 +1624,94 @@ func TestEmptyTxnIsBeingCorrectlyCounted(t *testing.T) { "after executing empty transactions, but it was not") } +//TestSessionTotalActiveTime tests that a session's total active time is +//correctly being recorded as transactions are executed. +func TestSessionTotalActiveTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params := base.TestServerArgs{} + s, mainDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := mainDB.Exec(fmt.Sprintf("CREATE USER %s", username.TestUser)) + if err != nil { + t.Fatal(err) + } + + pgURL, cleanupDB := sqlutils.PGUrl( + t, s.ServingSQLAddr(), "TestSessionTotalActiveTime", url.User(username.TestUser)) + defer cleanupDB() + rawSQL, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + + defer func() { + err := rawSQL.Close() + if err != nil { + t.Fatal(err) + } + }() + + getSessionWithTestUser := func() *serverpb.Session { + sessions := s.SQLServer().(*sql.Server).GetExecutorConfig().SessionRegistry.SerializeAll() + for _, s := range sessions { + if s.Username == username.TestUser { + return &s + } + } + t.Fatalf("expected session with username %s", username.TestUser) + return nil + } + + sqlDB := sqlutils.MakeSQLRunner(rawSQL) + sqlDB.Exec(t, "SELECT 1") + session := getSessionWithTestUser() + activeTimeNanos := session.TotalActiveTime.Nanoseconds() + + // We will execute different types of transactions. + // After each execution, verify the total active time has increased, but is no + // longer increasing after the transaction has completed. + testCases := []struct { + Query string + // SessionActiveAfterExecution signifies that the active time should still be active after this query. + SessionActiveAfterExecution bool + }{ + {"SELECT 1", false}, + // Test explicit transaction. + {"BEGIN", true}, + {"SELECT 1", true}, + {"SELECT 1, 2", true}, + {"COMMIT", false}, + {"BEGIN", true}, + {"SELECT crdb_internal.force_retry('1s')", true}, + {"COMMIT", false}, + } + + for _, tc := range testCases { + sqlDB.Exec(t, tc.Query) + if tc.Query == "crdb_internal.force_retry('1s'" { + continue + } + // Check that the total active time has increased. + session = getSessionWithTestUser() + require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos) + + activeTimeNanos = session.TotalActiveTime.Nanoseconds() + session = getSessionWithTestUser() + + if tc.SessionActiveAfterExecution { + require.Greater(t, session.TotalActiveTime.Nanoseconds(), activeTimeNanos) + } else { + require.Equal(t, activeTimeNanos, session.TotalActiveTime.Nanoseconds()) + } + + activeTimeNanos = session.TotalActiveTime.Nanoseconds() + } +} + // dynamicRequestFilter exposes a filter method which is a // kvserverbase.ReplicaRequestFilter but can be set dynamically. type dynamicRequestFilter struct { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index d1284372d05e..dcf3c615cc13 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1910,6 +1910,7 @@ CREATE TABLE crdb_internal.%s ( application_name STRING, -- the name of the application as per SET application_name active_queries STRING, -- the currently running queries as SQL last_active_query STRING, -- the query that finished last on this session as SQL + num_txns_executed INT, -- the number of transactions that were executed so far on this session session_start TIMESTAMP, -- the time when the session was opened oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started kv_txn STRING, -- the ID of the current KV transaction @@ -2011,6 +2012,7 @@ func populateSessionsTable( tree.NewDString(session.ApplicationName), tree.NewDString(activeQueries.String()), tree.NewDString(session.LastActiveQuery), + tree.NewDInt(tree.DInt(session.NumTxnsExecuted)), startTSDatum, oldestStartDatum, kvTxnIDDatum, @@ -2036,6 +2038,7 @@ func populateSessionsTable( tree.DNull, // application name tree.NewDString("-- "+rpcErr.Message), // active queries tree.DNull, // last active query + tree.DNull, // num txns executed tree.DNull, // session start tree.DNull, // oldest_query_start tree.DNull, // kv_txn diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go index f4adcc97f7f8..8c8ea6ed8895 100644 --- a/pkg/sql/delegate/show_sessions.go +++ b/pkg/sql/delegate/show_sessions.go @@ -16,7 +16,7 @@ import ( ) func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) { - const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` + const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start, num_txns_executed FROM crdb_internal.` table := `node_sessions` if n.Cluster { table = `cluster_sessions` diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index e270b9fb337d..3b9302392ab4 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -365,15 +365,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries last_auto_retry_reason -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTTTT colnames +query ITTTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end +node_id session_id user_name client_address application_name active_queries last_active_query num_txns_executed session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index dd8da9a6ae65..bd8caa0ec624 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -326,6 +326,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -341,6 +342,7 @@ CREATE TABLE crdb_internal.cluster_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -964,6 +966,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, @@ -979,6 +982,7 @@ CREATE TABLE crdb_internal.node_sessions ( application_name STRING NULL, active_queries STRING NULL, last_active_query STRING NULL, + num_txns_executed INT8 NULL, session_start TIMESTAMP NULL, oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, diff --git a/pkg/sql/testdata/txn_fingerprint_id_cache b/pkg/sql/testdata/txn_fingerprint_id_cache new file mode 100644 index 000000000000..533be5d3e1bb --- /dev/null +++ b/pkg/sql/testdata/txn_fingerprint_id_cache @@ -0,0 +1,77 @@ +# Initialize TxnFingerprintIDCache +init capacity=10 +---- +size: 0 + +# Add four TxnFingerprintIDs: 5, 6, 7, 8 +enqueue id=05 +---- +size: 1 + +enqueue id=06 +---- +size: 2 + +enqueue id=07 +---- +size: 3 + +enqueue id=08 +---- +size: 4 + +# There should be 4 valid TxnFingerprintIDs +show +---- +[8 7 6 5] + +enqueue id=09 +---- +size: 5 + +enqueue id=10 +---- +size: 6 + +show +---- +[10 9 8 7 6 5] + +# Decrease the TxnFingerprintIDCacheCapacity cluster setting to below current size. +override capacity=3 +---- +TxnFingerprintIDCacheCapacity: 3 + +# Enqueue another id to +enqueue id=11 +---- +size: 3 + +# The cache should have the most recent 3 insertions. +show +---- +[11 10 9] + +# Check that retrieving IDs also properly truncates the cache when the capacity has +# been changed. +# Increase capacity back up to 5, insert some values, then decrease capacity to 2 and +# retrieve all ids. +override capacity=5 +---- +TxnFingerprintIDCacheCapacity: 5 + +enqueue id=12 +---- +size: 4 + +enqueue id=13 +---- +size: 5 + +override capacity=2 +---- +TxnFingerprintIDCacheCapacity: 2 + +show +---- +[13 12] diff --git a/pkg/sql/txn_fingerprint_id_cache.go b/pkg/sql/txn_fingerprint_id_cache.go new file mode 100644 index 000000000000..5f6c154743c7 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_cache.go @@ -0,0 +1,130 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// TxnFingerprintIDCacheCapacity is the cluster setting that controls the +// capacity of the txn fingerprint ID cache. The cache will be resized +// on the next insert or get operation. +var TxnFingerprintIDCacheCapacity = settings.RegisterIntSetting( + settings.TenantWritable, + "sql.txn_fingerprint_id_cache.capacity", + "the maximum number of txn fingerprint IDs stored", + 100, + settings.NonNegativeInt, +).WithPublic() + +// TxnFingerprintIDCache is a thread-safe cache tracking transaction +// fingerprint IDs at the session level. +type TxnFingerprintIDCache struct { + st *cluster.Settings + + mu struct { + syncutil.RWMutex + acc *mon.BoundAccount + cache *cache.UnorderedCache + } + + mon *mon.BytesMonitor +} + +// NewTxnFingerprintIDCache returns a new TxnFingerprintIDCache. +func NewTxnFingerprintIDCache( + st *cluster.Settings, parentMon *mon.BytesMonitor, +) *TxnFingerprintIDCache { + b := &TxnFingerprintIDCache{st: st} + + b.mu.cache = cache.NewUnorderedCache(cache.Config{ + Policy: cache.CacheFIFO, + ShouldEvict: func(size int, _, _ interface{}) bool { + // Note that because the cache evicts as many elements as possible + // when adding an element, the cache will appropriately truncate + // when the capacity cluster setting is changed on the addition + // of an entry. + capacity := TxnFingerprintIDCacheCapacity.Get(&st.SV) + return int64(size) > capacity + }, + OnEvictedEntry: func(entry *cache.Entry) { + b.mu.acc.Shrink(context.Background(), 1) + }, + }) + + monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-cache", 0 /* limit */, parentMon) + b.mon = monitor + b.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + + return b +} + +// Add adds a TxnFingerprintID to the cache, truncating the cache to the cache's capacity +// if necessary. +func (b *TxnFingerprintIDCache) Add(value roachpb.TransactionFingerprintID) error { + b.mu.Lock() + defer b.mu.Unlock() + + if err := b.mu.acc.Grow(context.Background(), 1); err != nil { + return err + } + + b.mu.cache.Add(value, value) + + return nil +} + +// GetAllTxnFingerprintIDs returns a slice of all TxnFingerprintIDs in the cache. +// The cache may be truncated if the capacity was updated to a smaller size. +func (b *TxnFingerprintIDCache) GetAllTxnFingerprintIDs() []roachpb.TransactionFingerprintID { + b.mu.Lock() + defer b.mu.Unlock() + + size := int64(b.mu.cache.Len()) + capacity := TxnFingerprintIDCacheCapacity.Get(&b.st.SV) + if size > capacity { + size = capacity + } + + txnFingerprintIDs := make([]roachpb.TransactionFingerprintID, 0, size) + txnFingerprintIDsRemoved := make([]roachpb.TransactionFingerprintID, 0) + + b.mu.cache.Do(func(entry *cache.Entry) { + id := entry.Value.(roachpb.TransactionFingerprintID) + + if int64(len(txnFingerprintIDs)) == size { + txnFingerprintIDsRemoved = append(txnFingerprintIDsRemoved, id) + return + } + + txnFingerprintIDs = append(txnFingerprintIDs, id) + }) + + for _, id := range txnFingerprintIDsRemoved { + b.mu.cache.Del(id) + } + + return txnFingerprintIDs +} + +func (b *TxnFingerprintIDCache) size() int { + b.mu.RLock() + defer b.mu.RUnlock() + + return b.mu.cache.Len() +} diff --git a/pkg/sql/txn_fingerprint_id_cache_test.go b/pkg/sql/txn_fingerprint_id_cache_test.go new file mode 100644 index 000000000000..97de2eef0541 --- /dev/null +++ b/pkg/sql/txn_fingerprint_id_cache_test.go @@ -0,0 +1,159 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "fmt" + "math" + "sort" + "strconv" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +func TestTxnFingerprintIDCacheDataDriven(t *testing.T) { + defer leaktest.AfterTest(t)() + var txnFingerprintIDCache *TxnFingerprintIDCache + + datadriven.Walk(t, testutils.TestDataPath(t, "txn_fingerprint_id_cache"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() + switch d.Cmd { + case "init": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + + st := &cluster.Settings{} + monitor := mon.NewUnlimitedMonitor( + ctx, + "test", + mon.MemoryResource, + nil, /* currCount */ + nil, /* maxHist */ + math.MaxInt64, + st, + ) + txnFingerprintIDCache = NewTxnFingerprintIDCache(st, monitor) + + TxnFingerprintIDCacheCapacity.Override(ctx, &st.SV, int64(capacity)) + + return fmt.Sprintf("size: %d", txnFingerprintIDCache.size()) + + case "override": + var capacity int + d.ScanArgs(t, "capacity", &capacity) + TxnFingerprintIDCacheCapacity.Override(ctx, &txnFingerprintIDCache.st.SV, int64(capacity)) + capacityClusterSetting := TxnFingerprintIDCacheCapacity.Get(&txnFingerprintIDCache.st.SV) + return fmt.Sprintf("TxnFingerprintIDCacheCapacity: %d", capacityClusterSetting) + + case "enqueue": + var idStr string + d.ScanArgs(t, "id", &idStr) + + id, err := strconv.ParseUint(idStr, 10, 64) + require.NoError(t, err) + txnFingerprintID := roachpb.TransactionFingerprintID(id) + + err = txnFingerprintIDCache.Add(txnFingerprintID) + require.NoError(t, err) + + return fmt.Sprintf("size: %d", txnFingerprintIDCache.size()) + + case "show": + return printTxnFingerprintIDCache(txnFingerprintIDCache) + + default: + } + return "" + + }) + }) +} + +func printTxnFingerprintIDCache(txnFingerprintCache *TxnFingerprintIDCache) string { + txnFingerprintIDs := txnFingerprintCache.GetAllTxnFingerprintIDs() + + return fmt.Sprintf("%d", txnFingerprintIDs) +} + +func TestTxnFingerprintIDCache(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + txnFingerprintIDsRecorded := make([]roachpb.TransactionFingerprintID, 0) + appName := "testTxnFingerprintIDCache" + + params, _ := tests.CreateTestServerParams() + params.Knobs.SQLExecutor = &ExecutorTestingKnobs{ + BeforeTxnStatsRecorded: func( + sessionData *sessiondata.SessionData, + _ uuid.UUID, + txnFingerprintID roachpb.TransactionFingerprintID, + ) { + if !sessionData.Internal { + // Record every query we issue through our sql connection. + txnFingerprintIDsRecorded = append(txnFingerprintIDsRecorded, txnFingerprintID) + } + }, + } + + testServer, sqlConn, _ := serverutils.StartServer(t, params) + + defer func() { + require.NoError(t, sqlConn.Close()) + testServer.Stopper().Stop(ctx) + }() + + testConn := sqlutils.MakeSQLRunner(sqlConn) + + testConn.Exec(t, "SET application_name = $1", appName) + testConn.Exec(t, "CREATE TABLE test AS SELECT generate_series(1, 10)") + testConn.Exec(t, "SELECT * FROM test") + testConn.Exec(t, "BEGIN; SELECT 1; SELECT 1, 2, 3; COMMIT;") + + sessions := testServer.SQLServer().(*Server).GetExecutorConfig().SessionRegistry.SerializeAll() + + var session *serverpb.Session + for i, s := range sessions { + if s.ApplicationName == appName { + session = &sessions[i] + break + } + } + require.NotNil(t, session) + + sort.Slice(session.TxnFingerprintIDs, func(i, j int) bool { + return session.TxnFingerprintIDs[i] < session.TxnFingerprintIDs[j] + }) + + sort.Slice(txnFingerprintIDsRecorded, func(i, j int) bool { + return txnFingerprintIDsRecorded[i] < txnFingerprintIDsRecorded[j] + }) + + require.Equal(t, txnFingerprintIDsRecorded, session.TxnFingerprintIDs) +} diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index be4e732e323d..9e425ac6fa80 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -47,6 +47,8 @@ export const idleSession: SessionInfo = { alloc_bytes: Long.fromNumber(0), max_alloc_bytes: Long.fromNumber(10240), active_queries: [], + num_txns_executed: 1, + txn_fingerprint_ids: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -85,6 +87,8 @@ export const idleTransactionSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", active_queries: [], + num_txns_executed: 1, + txn_fingerprint_ids: [], status: Status.IDLE, toJSON: () => ({}), }, @@ -138,6 +142,8 @@ export const activeSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", status: Status.ACTIVE, + num_txns_executed: 1, + txn_fingerprint_ids: [], toJSON: () => ({}), }, }; @@ -163,6 +169,8 @@ export const closedSession: SessionInfo = { nanos: 369989000, }, status: Status.CLOSED, + num_txns_executed: 1, + txn_fingerprint_ids: [], toJSON: () => ({}), }, }; diff --git a/pkg/util/timeutil/stopwatch.go b/pkg/util/timeutil/stopwatch.go index ed3bf152b517..04c03ca50450 100644 --- a/pkg/util/timeutil/stopwatch.go +++ b/pkg/util/timeutil/stopwatch.go @@ -81,6 +81,14 @@ func (w *StopWatch) Elapsed() time.Duration { return w.mu.elapsed } +// LastStartedAt returns the time the stopwatch was last started, and a bool +// indicating if the stopwatch is currently started. +func (w *StopWatch) LastStartedAt() (startedAt time.Time, started bool) { + w.mu.Lock() + defer w.mu.Unlock() + return w.mu.startedAt, w.mu.started +} + // TestTimeSource is a source of time that remembers when it was created (in // terms of the real time) and returns the time based on its creation time and // the number of "advances" it has had. It is used for testing only.