diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index a7894badd201..e318ab43dd71 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -2063,6 +2063,7 @@ Request object for ListSessions and ListLocalSessions. | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | | username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) | +| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) | @@ -2108,6 +2109,8 @@ Session represents one SQL session. | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | +| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | +| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | @@ -2194,6 +2197,7 @@ Request object for ListSessions and ListLocalSessions. | Field | Type | Label | Description | Support status | | ----- | ---- | ----- | ----------- | -------------- | | username | [string](#cockroach.server.serverpb.ListSessionsRequest-string) | | Username of the user making this request. The caller is responsible to normalize the username (= case fold and perform unicode NFC normalization). | [reserved](#support-status) | +| exclude_closed_sessions | [bool](#cockroach.server.serverpb.ListSessionsRequest-bool) | | Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response. | [reserved](#support-status) | @@ -2239,6 +2243,8 @@ Session represents one SQL session. | max_alloc_bytes | [int64](#cockroach.server.serverpb.ListSessionsResponse-int64) | | High water mark of allocated bytes in the session memory monitor. | [reserved](#support-status) | | active_txn | [TxnInfo](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.TxnInfo) | | Information about the txn in progress on this session. Nil if the session doesn't currently have a transaction. | [reserved](#support-status) | | last_active_query_no_constants | [string](#cockroach.server.serverpb.ListSessionsResponse-string) | | The SQL statement fingerprint of the last query executed on this session, compatible with StatementStatisticsKey. | [reserved](#support-status) | +| status | [Session.Status](#cockroach.server.serverpb.ListSessionsResponse-cockroach.server.serverpb.Session.Status) | | The session's status. | [reserved](#support-status) | +| end | [google.protobuf.Timestamp](#cockroach.server.serverpb.ListSessionsResponse-google.protobuf.Timestamp) | | Timestamp of session's end. | [reserved](#support-status) | diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index e0b21475145d..651c41902924 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -87,7 +87,7 @@ server.web_session.purge.period duration 1h0m0s the time until old sessions are server.web_session.purge.ttl duration 1h0m0s if nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeout duration 168h0m0s the duration that a newly created web session will be valid sql.auth.resolve_membership_single_scan.enabled boolean true determines whether to populate the role membership cache with a single scan -sql.closed_session_cache.capacity integer 100 the maximum number of sessions in the cache +sql.closed_session_cache.capacity integer 1000 the maximum number of sessions in the cache sql.closed_session_cache.time_to_live integer 3600 the maximum time to live, in seconds sql.contention.event_store.capacity byte size 64 MiB the in-memory storage capacity per-node of contention event store sql.contention.event_store.duration_threshold duration 0s minimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index e28194f2d2af..2f5ac811597d 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -102,7 +102,7 @@ server.web_session.purge.ttlduration1h0m0sif nonzero, entries in system.web_sessions older than this duration are periodically purged server.web_session_timeoutduration168h0m0sthe duration that a newly created web session will be valid sql.auth.resolve_membership_single_scan.enabledbooleantruedetermines whether to populate the role membership cache with a single scan -sql.closed_session_cache.capacityinteger100the maximum number of sessions in the cache +sql.closed_session_cache.capacityinteger1000the maximum number of sessions in the cache sql.closed_session_cache.time_to_liveinteger3600the maximum time to live, in seconds sql.contention.event_store.capacitybyte size64 MiBthe in-memory storage capacity per-node of contention event store sql.contention.event_store.duration_thresholdduration0sminimum contention duration to cause the contention events to be collected into crdb_internal.transaction_contention_events diff --git a/docs/generated/swagger/spec.json b/docs/generated/swagger/spec.json index f6bd69dd9c43..69ffddbb0389 100644 --- a/docs/generated/swagger/spec.json +++ b/docs/generated/swagger/spec.json @@ -551,6 +551,12 @@ "name": "username", "in": "query" }, + { + "type": "bool", + "description": "Boolean to exclude closed sessions; if unspecified, defaults to false and closed sessions are included in the response.", + "name": "exclude_closed_sessions", + "in": "query" + }, { "type": "integer", "description": "Maximum number of results to return in this call.", @@ -1180,6 +1186,12 @@ "type": "string", "x-go-name": "ClientAddress" }, + "end": { + "description": "Timestamp of session's end.", + "type": "string", + "format": "date-time", + "x-go-name": "End" + }, "id": { "description": "ID of the session (uint128 represented as raw bytes).", "type": "array", @@ -1214,6 +1226,9 @@ "format": "date-time", "x-go-name": "Start" }, + "status": { + "$ref": "#/definitions/Session_Status" + }, "username": { "description": "Username of the user for this session.", "type": "string", @@ -1222,6 +1237,12 @@ }, "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" }, + "Session_Status": { + "type": "integer", + "format": "int32", + "title": "Enum for sessions status.", + "x-go-package": "github.com/cockroachdb/cockroach/pkg/server/serverpb" + }, "StoreID": { "type": "integer", "format": "int32", diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index abd649705951..be2840a77b20 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -242,15 +242,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/ccl/serverccl/statusccl/tenant_status_test.go b/pkg/ccl/serverccl/statusccl/tenant_status_test.go index ece17ea56773..9aeb0bdf2cda 100644 --- a/pkg/ccl/serverccl/statusccl/tenant_status_test.go +++ b/pkg/ccl/serverccl/statusccl/tenant_status_test.go @@ -806,7 +806,8 @@ WHERE func selectClusterSessionIDs(t *testing.T, conn *sqlutils.SQLRunner) []string { var sessionIDs []string - rows := conn.QueryStr(t, "SELECT session_id FROM crdb_internal.cluster_sessions") + rows := conn.QueryStr(t, + "SELECT session_id FROM crdb_internal.cluster_sessions WHERE status = 'ACTIVE' OR status = 'IDLE'") for _, row := range rows { sessionIDs = append(sessionIDs, row[0]) } @@ -822,7 +823,7 @@ func testTenantStatusCancelSession(t *testing.T, helper *tenantTestHelper) { httpPod1 := helper.testCluster().tenantAdminHTTPClient(t, 1) defer httpPod1.Close() listSessionsResp := serverpb.ListSessionsResponse{} - httpPod1.GetJSON("/_status/sessions", &listSessionsResp) + httpPod1.GetJSON("/_status/sessions?exclude_closed_sessions=true", &listSessionsResp) var session serverpb.Session for _, s := range listSessionsResp.Sessions { if s.LastActiveQuery == "SELECT 1" { diff --git a/pkg/server/api_v2.go b/pkg/server/api_v2.go index 8b8fb96fb5b7..2f4336b621ee 100644 --- a/pkg/server/api_v2.go +++ b/pkg/server/api_v2.go @@ -227,6 +227,12 @@ type listSessionsResponse struct { // description: Username of user to return sessions for; if unspecified, // sessions from all users are returned. // required: false +// - name: exclude_closed_sessions +// type: bool +// in: query +// description: Boolean to exclude closed sessions; if unspecified, defaults +// to false and closed sessions are included in the response. +// required: false // - name: limit // type: integer // in: query @@ -250,7 +256,8 @@ func (a *apiV2Server) listSessions(w http.ResponseWriter, r *http.Request) { ctx := r.Context() limit, start := getRPCPaginationValues(r) reqUsername := r.URL.Query().Get("username") - req := &serverpb.ListSessionsRequest{Username: reqUsername} + reqExcludeClosed := r.URL.Query().Get("exclude_closed_sessions") == "true" + req := &serverpb.ListSessionsRequest{Username: reqUsername, ExcludeClosedSessions: reqExcludeClosed} response := &listSessionsResponse{} outgoingCtx := apiToOutgoingGatewayCtx(ctx, r) diff --git a/pkg/server/api_v2_test.go b/pkg/server/api_v2_test.go index ade26d5430b8..f6a7f444bdcd 100644 --- a/pkg/server/api_v2_test.go +++ b/pkg/server/api_v2_test.go @@ -59,6 +59,7 @@ func TestListSessionsV2(t *testing.T) { req, err := http.NewRequest("GET", ts1.AdminURL()+apiV2Path+"sessions/", nil) require.NoError(t, err) query := req.URL.Query() + query.Add("exclude_closed_sessions", "true") if limit > 0 { query.Add("limit", strconv.Itoa(limit)) } diff --git a/pkg/server/server.go b/pkg/server/server.go index c1820a07d7d0..c37d2c1e162b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -689,6 +689,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { sHTTP := newHTTPServer(cfg.BaseConfig, rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn) sessionRegistry := sql.NewSessionRegistry() + closedSessionCache := sql.NewClosedSessionCache(cfg.Settings, sqlMonitorAndMetrics.rootSQLMemoryMonitor, time.Now) flowScheduler := flowinfra.NewFlowScheduler(cfg.AmbientCtx, stopper, st) sStatus := newStatusServer( @@ -706,6 +707,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { node.stores, stopper, sessionRegistry, + closedSessionCache, flowScheduler, internalExecutor, ) @@ -756,6 +758,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { registry: registry, recorder: recorder, sessionRegistry: sessionRegistry, + closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, circularInternalExecutor: internalExecutor, circularJobRegistry: jobRegistry, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index ccf889095338..9e21ec14091c 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -135,6 +135,7 @@ type SQLServer struct { // sessionRegistry can be queried for info on running SQL sessions. It is // shared between the sql.Server and the statusServer. sessionRegistry *sql.SessionRegistry + closedSessionCache *sql.ClosedSessionCache jobRegistry *jobs.Registry startupMigrationsMgr *startupmigrations.Manager statsRefresher *stats.Refresher @@ -273,6 +274,9 @@ type sqlServerArgs struct { // Used for SHOW/CANCEL QUERIE(S)/SESSION(S). sessionRegistry *sql.SessionRegistry + // Used to store closed sessions. + closedSessionCache *sql.ClosedSessionCache + // Used to track the DistSQL flows scheduled on this node but initiated on // behalf of other nodes. flowScheduler *flowinfra.FlowScheduler @@ -728,6 +732,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SQLStatusServer: cfg.sqlStatusServer, RegionsServer: cfg.regionsServer, SessionRegistry: cfg.sessionRegistry, + ClosedSessionCache: cfg.closedSessionCache, ContentionRegistry: contentionRegistry, SQLLiveness: cfg.sqlLivenessProvider, JobRegistry: jobRegistry, @@ -1061,6 +1066,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { tracingService: tracingService, tenantConnect: cfg.tenantConnect, sessionRegistry: cfg.sessionRegistry, + closedSessionCache: cfg.closedSessionCache, jobRegistry: jobRegistry, statsRefresher: statsRefresher, temporaryObjectCleaner: temporaryObjectCleaner, diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index 5a82b523c255..f6c5ab35c2d2 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -907,6 +907,9 @@ message ListSessionsRequest { // The caller is responsible to normalize the username // (= case fold and perform unicode NFC normalization). string username = 1; + // Boolean to exclude closed sessions; if unspecified, defaults + // to false and closed sessions are included in the response. + bool exclude_closed_sessions = 2; } // Session represents one SQL session. @@ -944,6 +947,17 @@ message Session { // The SQL statement fingerprint of the last query executed on this session, // compatible with StatementStatisticsKey. string last_active_query_no_constants = 13; + // Enum for sessions status. + enum Status { + ACTIVE = 0; + CLOSED = 1; + IDLE = 2; + } + // The session's status. + Status status = 14; + // Timestamp of session's end. + google.protobuf.Timestamp end = 15 + [ (gogoproto.nullable) = true, (gogoproto.stdtime) = true ]; } // An error wrapper object for ListSessionsResponse. diff --git a/pkg/server/status.go b/pkg/server/status.go index 1cfc128f07c8..d8c5e2742717 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -143,13 +143,14 @@ type baseStatusServer struct { serverpb.UnimplementedStatusServer log.AmbientContext - privilegeChecker *adminPrivilegeChecker - sessionRegistry *sql.SessionRegistry - flowScheduler *flowinfra.FlowScheduler - st *cluster.Settings - sqlServer *SQLServer - rpcCtx *rpc.Context - stopper *stop.Stopper + privilegeChecker *adminPrivilegeChecker + sessionRegistry *sql.SessionRegistry + closedSessionCache *sql.ClosedSessionCache + flowScheduler *flowinfra.FlowScheduler + st *cluster.Settings + sqlServer *SQLServer + rpcCtx *rpc.Context + stopper *stop.Stopper } // getLocalSessions returns a list of local sessions on this node. Note that the @@ -199,9 +200,22 @@ func (b *baseStatusServer) getLocalSessions( // The empty username means "all sessions". showAll := reqUsername.Undefined() - registry := b.sessionRegistry - sessions := registry.SerializeAll() - userSessions := make([]serverpb.Session, 0, len(sessions)) + // In order to avoid duplicate sessions showing up as both open and closed, + // we lock the session registry to prevent any changes to it while we + // serialize the sessions from the session registry and the closed session + // cache. + b.sessionRegistry.Lock() + sessions := b.sessionRegistry.SerializeAllLocked() + + var closedSessions []serverpb.Session + if !req.ExcludeClosedSessions { + closedSessions = b.closedSessionCache.GetSerializedSessions() + } + + b.sessionRegistry.Unlock() + + userSessions := make([]serverpb.Session, 0, len(sessions)+len(closedSessions)) + sessions = append(sessions, closedSessions...) for _, session := range sessions { if reqUsername.Normalized() != session.Username && !showAll { @@ -220,6 +234,11 @@ func (b *baseStatusServer) getLocalSessions( userSessions = append(userSessions, session) } + + sort.Slice(userSessions, func(i, j int) bool { + return userSessions[i].Start.Before(userSessions[j].Start) + }) + return userSessions, nil } @@ -501,19 +520,21 @@ func newStatusServer( stores *kvserver.Stores, stopper *stop.Stopper, sessionRegistry *sql.SessionRegistry, + closedSessionCache *sql.ClosedSessionCache, flowScheduler *flowinfra.FlowScheduler, internalExecutor *sql.InternalExecutor, ) *statusServer { ambient.AddLogTag("status", nil) server := &statusServer{ baseStatusServer: &baseStatusServer{ - AmbientContext: ambient, - privilegeChecker: adminAuthzCheck, - sessionRegistry: sessionRegistry, - flowScheduler: flowScheduler, - st: st, - rpcCtx: rpcCtx, - stopper: stopper, + AmbientContext: ambient, + privilegeChecker: adminAuthzCheck, + sessionRegistry: sessionRegistry, + closedSessionCache: closedSessionCache, + flowScheduler: flowScheduler, + st: st, + rpcCtx: rpcCtx, + stopper: stopper, }, cfg: cfg, admin: adminServer, @@ -2744,6 +2765,9 @@ func (s *statusServer) listSessionsHelper( } sessions := nodeResp.([]serverpb.Session) response.Sessions = append(response.Sessions, sessions...) + sort.Slice(response.Sessions, func(i, j int) bool { + return response.Sessions[i].Start.Before(response.Sessions[j].Start) + }) } errorFn := func(nodeID roachpb.NodeID, err error) { errResponse := serverpb.ListSessionsError{NodeID: nodeID, Message: err.Error()} diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 799ba2b9cc6f..21aec0eea5b6 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "testing" "time" @@ -3270,6 +3271,140 @@ func TestStatusAPIListSessions(t *testing.T) { require.Equal(t, "SELECT 2", session.LastActiveQuery) } +func TestListClosedSessions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The active sessions might close before the stress race can finish. + skip.UnderStressRace(t, "active sessions") + + ctx := context.Background() + serverParams, _ := tests.CreateTestServerParams() + testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: serverParams, + }) + defer testCluster.Stopper().Stop(ctx) + + server := testCluster.Server(0) + + doSessionsRequest := func(username string) serverpb.ListSessionsResponse { + var resp serverpb.ListSessionsResponse + path := "/_status/sessions?username=" + username + err := serverutils.GetJSONProto(server, path, &resp) + require.NoError(t, err) + return resp + } + + getUserConn := func(t *testing.T, username string, server serverutils.TestServerInterface) *gosql.DB { + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword(username, "hunter2"), + Host: server.ServingSQLAddr(), + } + db, err := gosql.Open("postgres", pgURL.String()) + require.NoError(t, err) + return db + } + + // Create a test user. + users := []string{"test_user_a", "test_user_b", "test_user_c"} + conn := testCluster.ServerConn(0) + _, err := conn.Exec(fmt.Sprintf(` +CREATE USER %s with password 'hunter2'; +CREATE USER %s with password 'hunter2'; +CREATE USER %s with password 'hunter2'; +`, users[0], users[1], users[2])) + require.NoError(t, err) + + var dbs []*gosql.DB + + // Open 10 sessions for the user and then close them. + for _, user := range users { + for i := 0; i < 10; i++ { + targetDB := getUserConn(t, user, testCluster.Server(0)) + dbs = append(dbs, targetDB) + sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`) + } + } + + for _, db := range dbs { + err := db.Close() + require.NoError(t, err) + } + + var wg sync.WaitGroup + + // Open 5 sessions for the user and leave them open by running pg_sleep(30). + for _, user := range users { + for i := 0; i < 5; i++ { + wg.Add(1) + go func(user string) { + // Open a session for the target user. + targetDB := getUserConn(t, user, testCluster.Server(0)) + defer targetDB.Close() + defer wg.Done() + sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT pg_sleep(30)`) + }(user) + } + } + + // Open 3 sessions for the user and leave them idle by running version(). + for _, user := range users { + for i := 0; i < 3; i++ { + targetDB := getUserConn(t, user, testCluster.Server(0)) + defer targetDB.Close() + sqlutils.MakeSQLRunner(targetDB).Exec(t, `SELECT version()`) + } + } + + countSessionStatus := func(allSessions []serverpb.Session) (int, int, int) { + var active, idle, closed int + for _, s := range allSessions { + if s.Status.String() == "ACTIVE" { + active++ + } + // IDLE sessions are open sessions with no active queries. + if s.Status.String() == "IDLE" { + idle++ + } + if s.Status.String() == "CLOSED" { + closed++ + } + } + return active, idle, closed + } + + expectedIdle := 3 + expectedActive := 5 + expectedClosed := 10 + + testutils.SucceedsSoon(t, func() error { + for _, user := range users { + sessionsResponse := doSessionsRequest(user) + allSessions := sessionsResponse.Sessions + sort.Slice(allSessions, func(i, j int) bool { + return allSessions[i].Start.Before(allSessions[j].Start) + }) + + active, idle, closed := countSessionStatus(allSessions) + if idle != expectedIdle { + return errors.Newf("User: %s: Expected %d idle sessions, got %d\n", user, expectedIdle, idle) + } + if active != expectedActive { + return errors.Newf("User: %s: Expected %d active sessions, got %d\n", user, expectedActive, active) + } + if closed != expectedClosed { + return errors.Newf("User: %s: Expected %d closed sessions, got %d\n", user, expectedClosed, closed) + } + } + return nil + }) + + // Wait for the goroutines from the pg_sleep() command to finish, so we can + // safely close their connections. + wg.Wait() +} + func TestTransactionContentionEvents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 1cc74076cbd7..a93c304625e4 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -143,6 +143,9 @@ func startTenantInternal( histogramWindowInterval: args.HistogramWindowInterval(), settings: args.Settings, }) + closedSessionCache := sql.NewClosedSessionCache( + baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now) + args.closedSessionCache = closedSessionCache // Initialize gRPC server for use on shared port with pg grpcMain := newGRPCServer(args.rpcContext) @@ -208,7 +211,7 @@ func startTenantInternal( // the SQL server object. tenantStatusServer := newTenantStatusServer( baseCfg.AmbientCtx, &adminPrivilegeChecker{ie: args.circularInternalExecutor}, - args.sessionRegistry, args.flowScheduler, baseCfg.Settings, nil, + args.sessionRegistry, args.closedSessionCache, args.flowScheduler, baseCfg.Settings, nil, args.rpcContext, args.stopper, ) diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index e5d4e2093282..d0e9df2171e2 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -81,6 +81,7 @@ func newTenantStatusServer( ambient log.AmbientContext, privilegeChecker *adminPrivilegeChecker, sessionRegistry *sql.SessionRegistry, + closedSessionCache *sql.ClosedSessionCache, flowScheduler *flowinfra.FlowScheduler, st *cluster.Settings, sqlServer *SQLServer, @@ -90,14 +91,15 @@ func newTenantStatusServer( ambient.AddLogTag("tenant-status", nil) return &tenantStatusServer{ baseStatusServer: baseStatusServer{ - AmbientContext: ambient, - privilegeChecker: privilegeChecker, - sessionRegistry: sessionRegistry, - flowScheduler: flowScheduler, - st: st, - sqlServer: sqlServer, - rpcCtx: rpcCtx, - stopper: stopper, + AmbientContext: ambient, + privilegeChecker: privilegeChecker, + sessionRegistry: sessionRegistry, + closedSessionCache: closedSessionCache, + flowScheduler: flowScheduler, + st: st, + sqlServer: sqlServer, + rpcCtx: rpcCtx, + stopper: stopper, }, } } diff --git a/pkg/sql/closed_session_cache.go b/pkg/sql/closed_session_cache.go index f851d6db15cd..2ac9c5a573ba 100644 --- a/pkg/sql/closed_session_cache.go +++ b/pkg/sql/closed_session_cache.go @@ -11,13 +11,17 @@ package sql import ( + "context" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/cache" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) type timeSource func() time.Time @@ -28,7 +32,7 @@ var ClosedSessionCacheCapacity = settings.RegisterIntSetting( settings.TenantWritable, "sql.closed_session_cache.capacity", "the maximum number of sessions in the cache", - 100, // TODO(gtr): Totally arbitrary for now, adjust later. + 1000, // TODO(gtr): Totally arbitrary for now, adjust later. ).WithPublic() // ClosedSessionCacheTimeToLive is the cluster setting that controls the maximum time @@ -47,32 +51,57 @@ type ClosedSessionCache struct { mu struct { syncutil.RWMutex + acc mon.BoundAccount data *cache.UnorderedCache } + + mon *mon.BytesMonitor } // NewClosedSessionCache returns a new ClosedSessionCache. -func NewClosedSessionCache(st *cluster.Settings, timeSrc timeSource) *ClosedSessionCache { - c := &ClosedSessionCache{st: st, timeSrc: timeSrc} +func NewClosedSessionCache( + st *cluster.Settings, parentMon *mon.BytesMonitor, timeSrc timeSource, +) *ClosedSessionCache { + monitor := mon.NewMonitorInheritWithLimit("closed-session-cache", 0 /* limit*/, parentMon) + c := &ClosedSessionCache{st: st, timeSrc: timeSrc} c.mu.data = cache.NewUnorderedCache(cache.Config{ Policy: cache.CacheFIFO, ShouldEvict: func(size int, _, _ interface{}) bool { capacity := ClosedSessionCacheCapacity.Get(&st.SV) return int64(size) > capacity }, + OnEvictedEntry: func(entry *cache.Entry) { + node := entry.Value.(*sessionNode) + size := int64(node.size()) + c.mu.acc.Shrink(context.Background(), size) + }, }) + c.mu.acc = monitor.MakeBoundAccount() + c.mon = monitor + c.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) return c } -// Add adds a closed session to the ClosedSessionCache. -func (c *ClosedSessionCache) Add(id ClusterWideID, session serverpb.Session) { +// add adds a closed session to the ClosedSessionCache. +func (c *ClosedSessionCache) add( + ctx context.Context, id ClusterWideID, session serverpb.Session, +) error { c.mu.Lock() defer c.mu.Unlock() + end := timeutil.Now() + session.End = &end + session.Status = serverpb.Session_CLOSED node := &sessionNode{id: id, data: session, timestamp: c.timeSrc()} + + if err := c.mu.acc.Grow(ctx, int64(node.size())); err != nil { + return err + } + c.mu.data.Add(id, node) + return nil } func (c *ClosedSessionCache) size() int { @@ -146,3 +175,11 @@ func (n *sessionNode) getAge(now timeSource) time.Duration { func (n *sessionNode) getAgeString(now timeSource) string { return n.getAge(now).Round(time.Second).String() } + +func (n *sessionNode) size() int { + size := 0 + size += int(unsafe.Sizeof(ClusterWideID{})) + size += n.data.Size() + size += int(unsafe.Sizeof(time.Time{})) + return size +} diff --git a/pkg/sql/closed_session_cache_test.go b/pkg/sql/closed_session_cache_test.go index d6fbac8ccc93..00690e7edc67 100644 --- a/pkg/sql/closed_session_cache_test.go +++ b/pkg/sql/closed_session_cache_test.go @@ -13,6 +13,7 @@ package sql import ( "context" "fmt" + "math" "strings" "testing" "time" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" @@ -40,15 +42,24 @@ func TestSessionCacheBasic(t *testing.T) { datadriven.Walk(t, testutils.TestDataPath(t, "closed_session_cache"), func(t *testing.T, path string) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + ctx := context.Background() switch d.Cmd { case "init": var capacity, timeToLive int d.ScanArgs(t, "capacity", &capacity) d.ScanArgs(t, "timeToLive", &timeToLive) - ctx := context.Background() st := &cluster.Settings{} - cache = NewClosedSessionCache(st, time.Now) + monitor := mon.NewUnlimitedMonitor( + ctx, + "test", + mon.MemoryResource, + nil, /* currCount */ + nil, /* maxHist */ + math.MaxInt64, + st, + ) + cache = NewClosedSessionCache(st, monitor, time.Now) ClosedSessionCacheCapacity.Override(ctx, &st.SV, int64(capacity)) ClosedSessionCacheTimeToLive.Override(ctx, &st.SV, int64(timeToLive)) @@ -62,7 +73,8 @@ func TestSessionCacheBasic(t *testing.T) { session := serverpb.Session{} sessionID := ClusterWideID{id} - cache.Add(sessionID, session) + err = cache.add(ctx, sessionID, session) + require.NoError(t, err) return fmt.Sprintf("cache_size: %d", cache.size()) case "addSessionBatch": @@ -79,7 +91,8 @@ func TestSessionCacheBasic(t *testing.T) { cache.timeSrc = addSeconds(cache.timeSrc, seconds) session := serverpb.Session{} sessionID := ClusterWideID{id} - cache.Add(sessionID, session) + err := cache.add(ctx, sessionID, session) + require.NoError(t, err) id = id.Add(1) } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 2eef8c9b3165..90b35e5b1024 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1778,7 +1778,7 @@ func (ex *connExecutor) run( parentMon *mon.BytesMonitor, reserved mon.BoundAccount, onCancel context.CancelFunc, -) error { +) (err error) { if !ex.activated { ex.activate(ctx, parentMon, reserved) } @@ -1788,7 +1788,15 @@ func (ex *connExecutor) run( ex.sessionID = ex.generateID() ex.server.cfg.SessionRegistry.register(ex.sessionID, ex.queryCancelKey, ex) ex.planner.extendedEvalCtx.setSessionID(ex.sessionID) - defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey) + + defer func() { + ex.server.cfg.SessionRegistry.deregister(ex.sessionID, ex.queryCancelKey) + addErr := ex.server.cfg.ClosedSessionCache.add(ctx, ex.sessionID, ex.serialize()) + if addErr != nil { + err = errors.CombineErrors(err, addErr) + } + }() + for { ex.curStmtAST = nil if err := ctx.Err(); err != nil { @@ -3089,6 +3097,10 @@ func (ex *connExecutor) serialize() serverpb.Session { lastActiveQuery = truncateSQL(ex.mu.LastActiveQuery.String()) lastActiveQueryNoConstants = truncateSQL(formatStatementHideConstants(ex.mu.LastActiveQuery)) } + status := serverpb.Session_IDLE + if len(activeQueries) > 0 { + status = serverpb.Session_ACTIVE + } // We always use base here as the fields from the SessionData should always // be that of the root session. @@ -3111,6 +3123,7 @@ func (ex *connExecutor) serialize() serverpb.Session { AllocBytes: ex.mon.AllocBytes(), MaxAllocBytes: ex.mon.MaximumBytes(), LastActiveQueryNoConstants: lastActiveQueryNoConstants, + Status: status, } } diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 73779f9c76cd..6fe480d55751 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -280,6 +280,12 @@ func startConnExecutor( } defer tempEngine.Close() ambientCtx := log.MakeTestingAmbientCtxWithNewTracer() + pool := mon.NewUnlimitedMonitor( + context.Background(), "test", mon.MemoryResource, + nil /* curCount */, nil /* maxHist */, math.MaxInt64, st, + ) + // This pool should never be Stop()ed because, if the test is failing, memory + // is not properly released. cfg := &ExecutorConfig{ AmbientCtx: ambientCtx, Settings: st, @@ -288,7 +294,8 @@ func startConnExecutor( SystemConfig: config.NewConstantSystemConfigProvider( config.NewSystemConfig(zonepb.DefaultZoneConfigRef()), ), - SessionRegistry: NewSessionRegistry(), + SessionRegistry: NewSessionRegistry(), + ClosedSessionCache: NewClosedSessionCache(st, pool, time.Now), NodeInfo: NodeInfo{ NodeID: nodeID, LogicalClusterID: func() uuid.UUID { return uuid.UUID{} }, @@ -326,12 +333,6 @@ func startConnExecutor( HistogramWindowInterval: base.DefaultHistogramWindowInterval(), CollectionFactory: descs.NewBareBonesCollectionFactory(st, keys.SystemSQLCodec), } - pool := mon.NewUnlimitedMonitor( - context.Background(), "test", mon.MemoryResource, - nil /* curCount */, nil /* maxHist */, math.MaxInt64, st, - ) - // This pool should never be Stop()ed because, if the test is failing, memory - // is not properly released. s := NewServer(cfg, pool) buf := NewStmtBuf() diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7a5894ec8e22..88acfa7f02f5 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1625,7 +1625,7 @@ var crdbInternalLocalTxnsTable = virtualSchemaTable{ if err := p.RequireAdminRole(ctx, "read crdb_internal.node_transactions"); err != nil { return err } - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1644,7 +1644,7 @@ var crdbInternalClusterTxnsTable = virtualSchemaTable{ if err := p.RequireAdminRole(ctx, "read crdb_internal.cluster_transactions"); err != nil { return err } - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1719,8 +1719,13 @@ CREATE TABLE crdb_internal.%s ( phase STRING -- the current execution phase )` -func (p *planner) makeSessionsRequest(ctx context.Context) (serverpb.ListSessionsRequest, error) { - req := serverpb.ListSessionsRequest{Username: p.SessionData().User().Normalized()} +func (p *planner) makeSessionsRequest( + ctx context.Context, excludeClosed bool, +) (serverpb.ListSessionsRequest, error) { + req := serverpb.ListSessionsRequest{ + Username: p.SessionData().User().Normalized(), + ExcludeClosedSessions: excludeClosed, + } hasAdmin, err := p.HasAdminRole(ctx) if err != nil { return serverpb.ListSessionsRequest{}, err @@ -1770,7 +1775,7 @@ var crdbInternalLocalQueriesTable = virtualSchemaTable{ comment: "running queries visible by current user (RAM; local node only)", schema: fmt.Sprintf(queriesSchemaPattern, "node_queries"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1788,7 +1793,7 @@ var crdbInternalClusterQueriesTable = virtualSchemaTable{ comment: "running queries visible by current user (cluster RPC; expensive!)", schema: fmt.Sprintf(queriesSchemaPattern, "cluster_queries"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1889,7 +1894,9 @@ CREATE TABLE crdb_internal.%s ( oldest_query_start TIMESTAMP, -- the time when the oldest query in the session was started kv_txn STRING, -- the ID of the current KV transaction alloc_bytes INT, -- the number of bytes allocated by the session - max_alloc_bytes INT -- the high water mark of bytes allocated by the session + max_alloc_bytes INT, -- the high water mark of bytes allocated by the session + status STRING, -- the status of the session (open, closed) + session_end TIMESTAMP -- the time when the session was closed ) ` @@ -1899,7 +1906,7 @@ var crdbInternalLocalSessionsTable = virtualSchemaTable{ comment: "running sessions visible by current user (RAM; local node only)", schema: fmt.Sprintf(sessionsSchemaPattern, "node_sessions"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1917,7 +1924,7 @@ var crdbInternalClusterSessionsTable = virtualSchemaTable{ comment: "running sessions visible to current user (cluster RPC; expensive!)", schema: fmt.Sprintf(sessionsSchemaPattern, "cluster_sessions"), populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { - req, err := p.makeSessionsRequest(ctx) + req, err := p.makeSessionsRequest(ctx, true /* excludeClosed */) if err != nil { return err } @@ -1969,6 +1976,13 @@ func populateSessionsTable( if err != nil { return err } + endTSDatum := tree.DNull + if session.End != nil { + endTSDatum, err = tree.MakeDTimestamp(*session.End, time.Microsecond) + if err != nil { + return err + } + } if err := addRow( tree.NewDInt(tree.DInt(session.NodeID)), sessionID, @@ -1982,6 +1996,8 @@ func populateSessionsTable( kvTxnIDDatum, tree.NewDInt(tree.DInt(session.AllocBytes)), tree.NewDInt(tree.DInt(session.MaxAllocBytes)), + tree.NewDString(session.Status.String()), + endTSDatum, ); err != nil { return err } @@ -2005,6 +2021,8 @@ func populateSessionsTable( tree.DNull, // kv_txn tree.DNull, // alloc_bytes tree.DNull, // max_alloc_bytes + tree.DNull, // status + tree.DNull, // session end ); err != nil { return err } diff --git a/pkg/sql/delegate/show_sessions.go b/pkg/sql/delegate/show_sessions.go index be8dbe806f48..9dc08aba57c7 100644 --- a/pkg/sql/delegate/show_sessions.go +++ b/pkg/sql/delegate/show_sessions.go @@ -16,7 +16,7 @@ import ( ) func (d *delegator) delegateShowSessions(n *tree.ShowSessions) (tree.Statement, error) { - const query = `SELECT node_id, session_id, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` + const query = `SELECT node_id, session_id, status, user_name, client_address, application_name, active_queries, last_active_query, session_start, oldest_query_start FROM crdb_internal.` table := `node_sessions` if n.Cluster { table = `cluster_sessions` diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index bad602fc6606..26db6e574323 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1178,6 +1178,7 @@ type ExecutorConfig struct { RegionsServer serverpb.RegionsServer MetricsRecorder nodeStatusGenerator SessionRegistry *SessionRegistry + ClosedSessionCache *ClosedSessionCache SQLLiveness sqlliveness.Liveness JobRegistry *jobs.Registry VirtualSchemas *VirtualSchemaHolder @@ -2032,6 +2033,11 @@ func (r *SessionRegistry) SerializeAll() []serverpb.Session { r.Lock() defer r.Unlock() + return r.SerializeAllLocked() +} + +// SerializeAllLocked is like SerializeAll but assumes SessionRegistry's mutex is locked. +func (r *SessionRegistry) SerializeAllLocked() []serverpb.Session { response := make([]serverpb.Session, 0, len(r.sessions)) for _, s := range r.sessions { diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 381929f40a45..e29788fbe7f1 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -363,15 +363,15 @@ SELECT * FROM crdb_internal.cluster_transactions WHERE node_id < 0 ---- id node_id session_id start txn_string application_name num_stmts num_retries num_auto_retries -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.node_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end -query ITTTTTTTTTTT colnames +query ITTTTTTTTTTTTT colnames SELECT * FROM crdb_internal.cluster_sessions WHERE node_id < 0 ---- -node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes +node_id session_id user_name client_address application_name active_queries last_active_query session_start oldest_query_start kv_txn alloc_bytes max_alloc_bytes status session_end query IIITTTI colnames SELECT * FROM crdb_internal.node_contention_events WHERE table_id < 0 diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 77d5cff749a9..ed7e14ad5c48 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -287,7 +287,9 @@ CREATE TABLE crdb_internal.cluster_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) CREATE TABLE crdb_internal.cluster_sessions ( node_id INT8 NOT NULL, session_id STRING NULL, @@ -300,7 +302,9 @@ CREATE TABLE crdb_internal.cluster_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) {} {} CREATE TABLE crdb_internal.cluster_settings ( variable STRING NOT NULL, @@ -906,7 +910,9 @@ CREATE TABLE crdb_internal.node_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) CREATE TABLE crdb_internal.node_sessions ( node_id INT8 NOT NULL, session_id STRING NULL, @@ -919,7 +925,9 @@ CREATE TABLE crdb_internal.node_sessions ( oldest_query_start TIMESTAMP NULL, kv_txn STRING NULL, alloc_bytes INT8 NULL, - max_alloc_bytes INT8 NULL + max_alloc_bytes INT8 NULL, + status STRING NULL, + session_end TIMESTAMP NULL ) {} {} CREATE TABLE crdb_internal.node_statement_statistics ( node_id INT8 NOT NULL, diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index bf007cd955b9..d423c5f9866d 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -569,7 +569,9 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( var err error response, err = c.statusServer.ListSessions( ctx, - &serverpb.ListSessionsRequest{}, + &serverpb.ListSessionsRequest{ + ExcludeClosedSessions: true, + }, ) if response != nil && len(response.Errors) > 0 && err == nil { diff --git a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts index c9f3c9b3010e..05140c7c720f 100644 --- a/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts +++ b/pkg/ui/workspaces/cluster-ui/src/sessions/sessionsPage.fixture.ts @@ -21,6 +21,8 @@ import { CancelSessionRequestMessage, } from "src/api/terminateQueryApi"; +import Status = cockroach.server.serverpb.Session.Status; + const history = createMemoryHistory({ initialEntries: ["/sessions"] }); const toUuid = function(s: string): Uint8Array { @@ -45,6 +47,7 @@ export const idleSession: SessionInfo = { alloc_bytes: Long.fromNumber(0), max_alloc_bytes: Long.fromNumber(10240), active_queries: [], + status: Status.ACTIVE, toJSON: () => ({}), }, }; @@ -82,6 +85,7 @@ export const idleTransactionSession: SessionInfo = { }, last_active_query_no_constants: "SHOW database", active_queries: [], + status: Status.ACTIVE, toJSON: () => ({}), }, }; @@ -133,6 +137,7 @@ export const activeSession: SessionInfo = { num_auto_retries: 3, }, last_active_query_no_constants: "SHOW database", + status: Status.ACTIVE, toJSON: () => ({}), }, };