diff --git a/pkg/cli/clisqlclient/statement_diag.go b/pkg/cli/clisqlclient/statement_diag.go index 3c514f8094ca..6741950dbcff 100644 --- a/pkg/cli/clisqlclient/statement_diag.go +++ b/pkg/cli/clisqlclient/statement_diag.go @@ -13,7 +13,6 @@ package clisqlclient import ( "context" "database/sql/driver" - "fmt" "io" "os" "time" @@ -100,51 +99,22 @@ func StmtDiagListOutstandingRequests( return result, nil } -// TODO(yuzefovich): remove this in 22.2. -func isAtLeast22dot1ClusterVersion(ctx context.Context, conn Conn) (bool, error) { - // Check whether the upgrade to add the conditional diagnostics columns to - // the statement_diagnostics_requests system table has already been run. - row, err := conn.QueryRow(ctx, ` -SELECT - count(*) -FROM - [SHOW COLUMNS FROM system.statement_diagnostics_requests] -WHERE - column_name = 'min_execution_latency';`) - if err != nil { - return false, err - } - c, ok := row[0].(int64) - if !ok { - return false, nil - } - return c == 1, nil -} - func stmtDiagListOutstandingRequestsInternal( ctx context.Context, conn Conn, ) ([]StmtDiagActivationRequest, error) { - var extraColumns string - atLeast22dot1, err := isAtLeast22dot1ClusterVersion(ctx, conn) - if err != nil { - return nil, err - } - if atLeast22dot1 { - // Converting an INTERVAL to a number of milliseconds within that - // interval is a pain - we extract the number of seconds and multiply it - // by 1000, then we extract the number of milliseconds and add that up - // to the previous result; however, we have now double counted the - // seconds field, so we have to remove that times 1000. - getMilliseconds := `EXTRACT(epoch FROM min_execution_latency)::INT8 * 1000 + + // Converting an INTERVAL to a number of milliseconds within that interval + // is a pain - we extract the number of seconds and multiply it by 1000, + // then we extract the number of milliseconds and add that up to the + // previous result; however, we have now double counted the seconds field, + // so we have to remove that times 1000. + getMilliseconds := `EXTRACT(epoch FROM min_execution_latency)::INT8 * 1000 + EXTRACT(millisecond FROM min_execution_latency)::INT8 - EXTRACT(second FROM min_execution_latency)::INT8 * 1000` - extraColumns = ", " + getMilliseconds + ", expires_at" - } rows, err := conn.Query(ctx, - fmt.Sprintf(`SELECT id, statement_fingerprint, requested_at%s - FROM system.statement_diagnostics_requests - WHERE NOT completed - ORDER BY requested_at DESC`, extraColumns), + "SELECT id, statement_fingerprint, requested_at, "+getMilliseconds+`, expires_at + FROM system.statement_diagnostics_requests + WHERE NOT completed + ORDER BY requested_at DESC`, ) if err != nil { return nil, err @@ -159,13 +129,11 @@ func stmtDiagListOutstandingRequestsInternal( } var minExecutionLatency time.Duration var expiresAt time.Time - if atLeast22dot1 { - if ms, ok := vals[3].(int64); ok { - minExecutionLatency = time.Millisecond * time.Duration(ms) - } - if e, ok := vals[4].(time.Time); ok { - expiresAt = e - } + if ms, ok := vals[3].(int64); ok { + minExecutionLatency = time.Millisecond * time.Duration(ms) + } + if e, ok := vals[4].(time.Time); ok { + expiresAt = e } info := StmtDiagActivationRequest{ ID: vals[0].(int64), diff --git a/pkg/server/statement_diagnostics_requests.go b/pkg/server/statement_diagnostics_requests.go index 268d3aa30c27..5584d81e46a5 100644 --- a/pkg/server/statement_diagnostics_requests.go +++ b/pkg/server/statement_diagnostics_requests.go @@ -12,10 +12,8 @@ package server import ( "context" - "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -131,27 +129,21 @@ func (s *statusServer) StatementDiagnosticsRequests( var err error - // TODO(yuzefovich): remove this version gating in 22.2. - var extraColumns string - if s.admin.server.st.Version.IsActive(ctx, clusterversion.AlterSystemStmtDiagReqs) { - extraColumns = `, - min_execution_latency, - expires_at` - } - // TODO(davidh): Add pagination to this request. it, err := s.internalExecutor.QueryIteratorEx(ctx, "stmt-diag-get-all", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - fmt.Sprintf(`SELECT + `SELECT id, statement_fingerprint, completed, statement_diagnostics_id, - requested_at%s + requested_at, + min_execution_latency, + expires_at FROM - system.statement_diagnostics_requests`, extraColumns)) + system.statement_diagnostics_requests`) if err != nil { return nil, err } @@ -175,16 +167,14 @@ func (s *statusServer) StatementDiagnosticsRequests( if requestedAt, ok := row[4].(*tree.DTimestampTZ); ok { req.RequestedAt = requestedAt.Time } - if extraColumns != "" { - if minExecutionLatency, ok := row[5].(*tree.DInterval); ok { - req.MinExecutionLatency = time.Duration(minExecutionLatency.Duration.Nanos()) - } - if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok { - req.ExpiresAt = expiresAt.Time - // Don't return already expired requests. - if req.ExpiresAt.Before(timeutil.Now()) { - continue - } + if minExecutionLatency, ok := row[5].(*tree.DInterval); ok { + req.MinExecutionLatency = time.Duration(minExecutionLatency.Duration.Nanos()) + } + if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok { + req.ExpiresAt = expiresAt.Time + // Don't return already expired requests. + if req.ExpiresAt.Before(timeutil.Now()) { + continue } } diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index 3c7cef433b7e..b4147f6458bf 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -23,7 +23,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/row", visibility = ["//visibility:public"], deps = [ - "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 9349a99b143e..94330314d13a 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -13,7 +13,6 @@ package row import ( "context" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -24,13 +23,11 @@ import ( // CanUseStreamer returns whether the kvstreamer.Streamer API should be used. func CanUseStreamer(ctx context.Context, settings *cluster.Settings) bool { - // TODO(yuzefovich): remove the version gate in 22.2 cycle. - return settings.Version.IsActive(ctx, clusterversion.ScanWholeRows) && - useStreamerEnabled.Get(&settings.SV) + return useStreamerEnabled.Get(&settings.SV) } // useStreamerEnabled determines whether the Streamer API should be used. -// TODO(yuzefovich): remove this in 22.2. +// TODO(yuzefovich): remove this in 23.1. var useStreamerEnabled = settings.RegisterBoolSetting( settings.TenantReadOnly, "sql.distsql.use_streamer.enabled", diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index 49623d067569..203a7e623fed 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics", visibility = ["//visibility:public"], deps = [ - "//pkg/clusterversion", "//pkg/gossip", "//pkg/kv", "//pkg/roachpb", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index 6b8950b5b7e1..a3b46e31188e 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -16,7 +16,6 @@ import ( "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -195,11 +194,6 @@ func (r *Registry) poll(ctx context.Context) { } } -// TODO(yuzefovich): remove this in 22.2. -func (r *Registry) isMinExecutionLatencySupported(ctx context.Context) bool { - return r.st.Version.IsActive(ctx, clusterversion.AlterSystemStmtDiagReqs) -} - // RequestID is the ID of a diagnostics request, corresponding to the id // column in statement_diagnostics_requests. // A zero ID is invalid. @@ -285,29 +279,19 @@ func (r *Registry) insertRequestInternal( return 0, err } - if !r.isMinExecutionLatencySupported(ctx) { - if minExecutionLatency != 0 || expiresAfter != 0 { - return 0, errors.New( - "conditional statement diagnostics are only supported " + - "after 22.1 version migrations have completed", - ) - } - } - var reqID RequestID var expiresAt time.Time err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Check if there's already a pending request for this fingerprint. - var extraConditions string - if r.isMinExecutionLatencySupported(ctx) { - extraConditions = " AND (expires_at IS NULL OR expires_at > now())" - } row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn, sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - fmt.Sprintf("SELECT count(1) FROM system.statement_diagnostics_requests "+ - "WHERE completed = false AND statement_fingerprint = $1%s", extraConditions), + `SELECT count(1) FROM system.statement_diagnostics_requests + WHERE + completed = false AND + statement_fingerprint = $1 AND + (expires_at IS NULL OR expires_at > now())`, stmtFingerprint) if err != nil { return err @@ -386,14 +370,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error { return err } - if !r.isMinExecutionLatencySupported(ctx) { - // If conditional diagnostics are not supported for this cluster yet, - // then we cannot cancel the request. - return errors.New( - "statement diagnostics can only be canceled after 22.1 version migrations have completed", - ) - } - row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), @@ -636,25 +612,20 @@ func (r *Registry) InsertStatementDiagnostics( // updates r.mu.requests accordingly. func (r *Registry) pollRequests(ctx context.Context) error { var rows []tree.Datums - isMinExecutionLatencySupported := r.isMinExecutionLatencySupported(ctx) // Loop until we run the query without straddling an epoch increment. for { r.mu.Lock() epoch := r.mu.epoch r.mu.Unlock() - var extraColumns string - var extraConditions string - if isMinExecutionLatencySupported { - extraColumns = ", min_execution_latency, expires_at" - extraConditions = " AND (expires_at IS NULL OR expires_at > now())" - } it, err := r.ie.QueryIteratorEx(ctx, "stmt-diag-poll", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - fmt.Sprintf("SELECT id, statement_fingerprint%s FROM system.statement_diagnostics_requests "+ - "WHERE completed = false%s", extraColumns, extraConditions)) + `SELECT id, statement_fingerprint, min_execution_latency, expires_at + FROM system.statement_diagnostics_requests + WHERE completed = false AND (expires_at IS NULL OR expires_at > now())`, + ) if err != nil { return err } @@ -686,13 +657,11 @@ func (r *Registry) pollRequests(ctx context.Context) error { stmtFingerprint := string(*row[1].(*tree.DString)) var minExecutionLatency time.Duration var expiresAt time.Time - if isMinExecutionLatencySupported { - if minExecLatency, ok := row[2].(*tree.DInterval); ok { - minExecutionLatency = time.Duration(minExecLatency.Nanos()) - } - if e, ok := row[3].(*tree.DTimestampTZ); ok { - expiresAt = e.Time - } + if minExecLatency, ok := row[2].(*tree.DInterval); ok { + minExecutionLatency = time.Duration(minExecLatency.Nanos()) + } + if e, ok := row[3].(*tree.DTimestampTZ); ok { + expiresAt = e.Time } ids.Add(int(id)) r.addRequestInternalLocked(ctx, id, stmtFingerprint, minExecutionLatency, expiresAt)