From 077025470af81b32634880be250b92964ec1db05 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 19 May 2022 16:10:48 -0700 Subject: [PATCH] sql,stmtdiagnostics: remove some version gates This commit addresses several TODOs with my name on them about removing the version gates, mostly around the conditional statement diagnostics introduced in 22.1 cycle. Release note: None --- pkg/cli/clisqlclient/statement_diag.go | 62 +++++-------------- pkg/server/statement_diagnostics_requests.go | 36 ++++------- pkg/sql/row/BUILD.bazel | 1 - pkg/sql/row/kv_batch_streamer.go | 7 +-- pkg/sql/stmtdiagnostics/BUILD.bazel | 1 - .../stmtdiagnostics/statement_diagnostics.go | 59 +++++------------- 6 files changed, 44 insertions(+), 122 deletions(-) diff --git a/pkg/cli/clisqlclient/statement_diag.go b/pkg/cli/clisqlclient/statement_diag.go index 3c514f8094ca..6741950dbcff 100644 --- a/pkg/cli/clisqlclient/statement_diag.go +++ b/pkg/cli/clisqlclient/statement_diag.go @@ -13,7 +13,6 @@ package clisqlclient import ( "context" "database/sql/driver" - "fmt" "io" "os" "time" @@ -100,51 +99,22 @@ func StmtDiagListOutstandingRequests( return result, nil } -// TODO(yuzefovich): remove this in 22.2. -func isAtLeast22dot1ClusterVersion(ctx context.Context, conn Conn) (bool, error) { - // Check whether the upgrade to add the conditional diagnostics columns to - // the statement_diagnostics_requests system table has already been run. - row, err := conn.QueryRow(ctx, ` -SELECT - count(*) -FROM - [SHOW COLUMNS FROM system.statement_diagnostics_requests] -WHERE - column_name = 'min_execution_latency';`) - if err != nil { - return false, err - } - c, ok := row[0].(int64) - if !ok { - return false, nil - } - return c == 1, nil -} - func stmtDiagListOutstandingRequestsInternal( ctx context.Context, conn Conn, ) ([]StmtDiagActivationRequest, error) { - var extraColumns string - atLeast22dot1, err := isAtLeast22dot1ClusterVersion(ctx, conn) - if err != nil { - return nil, err - } - if atLeast22dot1 { - // Converting an INTERVAL to a number of milliseconds within that - // interval is a pain - we extract the number of seconds and multiply it - // by 1000, then we extract the number of milliseconds and add that up - // to the previous result; however, we have now double counted the - // seconds field, so we have to remove that times 1000. - getMilliseconds := `EXTRACT(epoch FROM min_execution_latency)::INT8 * 1000 + + // Converting an INTERVAL to a number of milliseconds within that interval + // is a pain - we extract the number of seconds and multiply it by 1000, + // then we extract the number of milliseconds and add that up to the + // previous result; however, we have now double counted the seconds field, + // so we have to remove that times 1000. + getMilliseconds := `EXTRACT(epoch FROM min_execution_latency)::INT8 * 1000 + EXTRACT(millisecond FROM min_execution_latency)::INT8 - EXTRACT(second FROM min_execution_latency)::INT8 * 1000` - extraColumns = ", " + getMilliseconds + ", expires_at" - } rows, err := conn.Query(ctx, - fmt.Sprintf(`SELECT id, statement_fingerprint, requested_at%s - FROM system.statement_diagnostics_requests - WHERE NOT completed - ORDER BY requested_at DESC`, extraColumns), + "SELECT id, statement_fingerprint, requested_at, "+getMilliseconds+`, expires_at + FROM system.statement_diagnostics_requests + WHERE NOT completed + ORDER BY requested_at DESC`, ) if err != nil { return nil, err @@ -159,13 +129,11 @@ func stmtDiagListOutstandingRequestsInternal( } var minExecutionLatency time.Duration var expiresAt time.Time - if atLeast22dot1 { - if ms, ok := vals[3].(int64); ok { - minExecutionLatency = time.Millisecond * time.Duration(ms) - } - if e, ok := vals[4].(time.Time); ok { - expiresAt = e - } + if ms, ok := vals[3].(int64); ok { + minExecutionLatency = time.Millisecond * time.Duration(ms) + } + if e, ok := vals[4].(time.Time); ok { + expiresAt = e } info := StmtDiagActivationRequest{ ID: vals[0].(int64), diff --git a/pkg/server/statement_diagnostics_requests.go b/pkg/server/statement_diagnostics_requests.go index 268d3aa30c27..5584d81e46a5 100644 --- a/pkg/server/statement_diagnostics_requests.go +++ b/pkg/server/statement_diagnostics_requests.go @@ -12,10 +12,8 @@ package server import ( "context" - "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -131,27 +129,21 @@ func (s *statusServer) StatementDiagnosticsRequests( var err error - // TODO(yuzefovich): remove this version gating in 22.2. - var extraColumns string - if s.admin.server.st.Version.IsActive(ctx, clusterversion.AlterSystemStmtDiagReqs) { - extraColumns = `, - min_execution_latency, - expires_at` - } - // TODO(davidh): Add pagination to this request. it, err := s.internalExecutor.QueryIteratorEx(ctx, "stmt-diag-get-all", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - fmt.Sprintf(`SELECT + `SELECT id, statement_fingerprint, completed, statement_diagnostics_id, - requested_at%s + requested_at, + min_execution_latency, + expires_at FROM - system.statement_diagnostics_requests`, extraColumns)) + system.statement_diagnostics_requests`) if err != nil { return nil, err } @@ -175,16 +167,14 @@ func (s *statusServer) StatementDiagnosticsRequests( if requestedAt, ok := row[4].(*tree.DTimestampTZ); ok { req.RequestedAt = requestedAt.Time } - if extraColumns != "" { - if minExecutionLatency, ok := row[5].(*tree.DInterval); ok { - req.MinExecutionLatency = time.Duration(minExecutionLatency.Duration.Nanos()) - } - if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok { - req.ExpiresAt = expiresAt.Time - // Don't return already expired requests. - if req.ExpiresAt.Before(timeutil.Now()) { - continue - } + if minExecutionLatency, ok := row[5].(*tree.DInterval); ok { + req.MinExecutionLatency = time.Duration(minExecutionLatency.Duration.Nanos()) + } + if expiresAt, ok := row[6].(*tree.DTimestampTZ); ok { + req.ExpiresAt = expiresAt.Time + // Don't return already expired requests. + if req.ExpiresAt.Before(timeutil.Now()) { + continue } } diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index 3c7cef433b7e..b4147f6458bf 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -23,7 +23,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/row", visibility = ["//visibility:public"], deps = [ - "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 9349a99b143e..94330314d13a 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -13,7 +13,6 @@ package row import ( "context" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -24,13 +23,11 @@ import ( // CanUseStreamer returns whether the kvstreamer.Streamer API should be used. func CanUseStreamer(ctx context.Context, settings *cluster.Settings) bool { - // TODO(yuzefovich): remove the version gate in 22.2 cycle. - return settings.Version.IsActive(ctx, clusterversion.ScanWholeRows) && - useStreamerEnabled.Get(&settings.SV) + return useStreamerEnabled.Get(&settings.SV) } // useStreamerEnabled determines whether the Streamer API should be used. -// TODO(yuzefovich): remove this in 22.2. +// TODO(yuzefovich): remove this in 23.1. var useStreamerEnabled = settings.RegisterBoolSetting( settings.TenantReadOnly, "sql.distsql.use_streamer.enabled", diff --git a/pkg/sql/stmtdiagnostics/BUILD.bazel b/pkg/sql/stmtdiagnostics/BUILD.bazel index 49623d067569..203a7e623fed 100644 --- a/pkg/sql/stmtdiagnostics/BUILD.bazel +++ b/pkg/sql/stmtdiagnostics/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics", visibility = ["//visibility:public"], deps = [ - "//pkg/clusterversion", "//pkg/gossip", "//pkg/kv", "//pkg/roachpb", diff --git a/pkg/sql/stmtdiagnostics/statement_diagnostics.go b/pkg/sql/stmtdiagnostics/statement_diagnostics.go index 6b8950b5b7e1..a3b46e31188e 100644 --- a/pkg/sql/stmtdiagnostics/statement_diagnostics.go +++ b/pkg/sql/stmtdiagnostics/statement_diagnostics.go @@ -16,7 +16,6 @@ import ( "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -195,11 +194,6 @@ func (r *Registry) poll(ctx context.Context) { } } -// TODO(yuzefovich): remove this in 22.2. -func (r *Registry) isMinExecutionLatencySupported(ctx context.Context) bool { - return r.st.Version.IsActive(ctx, clusterversion.AlterSystemStmtDiagReqs) -} - // RequestID is the ID of a diagnostics request, corresponding to the id // column in statement_diagnostics_requests. // A zero ID is invalid. @@ -285,29 +279,19 @@ func (r *Registry) insertRequestInternal( return 0, err } - if !r.isMinExecutionLatencySupported(ctx) { - if minExecutionLatency != 0 || expiresAfter != 0 { - return 0, errors.New( - "conditional statement diagnostics are only supported " + - "after 22.1 version migrations have completed", - ) - } - } - var reqID RequestID var expiresAt time.Time err = r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { // Check if there's already a pending request for this fingerprint. - var extraConditions string - if r.isMinExecutionLatencySupported(ctx) { - extraConditions = " AND (expires_at IS NULL OR expires_at > now())" - } row, err := r.ie.QueryRowEx(ctx, "stmt-diag-check-pending", txn, sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - fmt.Sprintf("SELECT count(1) FROM system.statement_diagnostics_requests "+ - "WHERE completed = false AND statement_fingerprint = $1%s", extraConditions), + `SELECT count(1) FROM system.statement_diagnostics_requests + WHERE + completed = false AND + statement_fingerprint = $1 AND + (expires_at IS NULL OR expires_at > now())`, stmtFingerprint) if err != nil { return err @@ -386,14 +370,6 @@ func (r *Registry) CancelRequest(ctx context.Context, requestID int64) error { return err } - if !r.isMinExecutionLatencySupported(ctx) { - // If conditional diagnostics are not supported for this cluster yet, - // then we cannot cancel the request. - return errors.New( - "statement diagnostics can only be canceled after 22.1 version migrations have completed", - ) - } - row, err := r.ie.QueryRowEx(ctx, "stmt-diag-cancel-request", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), @@ -636,25 +612,20 @@ func (r *Registry) InsertStatementDiagnostics( // updates r.mu.requests accordingly. func (r *Registry) pollRequests(ctx context.Context) error { var rows []tree.Datums - isMinExecutionLatencySupported := r.isMinExecutionLatencySupported(ctx) // Loop until we run the query without straddling an epoch increment. for { r.mu.Lock() epoch := r.mu.epoch r.mu.Unlock() - var extraColumns string - var extraConditions string - if isMinExecutionLatencySupported { - extraColumns = ", min_execution_latency, expires_at" - extraConditions = " AND (expires_at IS NULL OR expires_at > now())" - } it, err := r.ie.QueryIteratorEx(ctx, "stmt-diag-poll", nil, /* txn */ sessiondata.InternalExecutorOverride{ User: username.RootUserName(), }, - fmt.Sprintf("SELECT id, statement_fingerprint%s FROM system.statement_diagnostics_requests "+ - "WHERE completed = false%s", extraColumns, extraConditions)) + `SELECT id, statement_fingerprint, min_execution_latency, expires_at + FROM system.statement_diagnostics_requests + WHERE completed = false AND (expires_at IS NULL OR expires_at > now())`, + ) if err != nil { return err } @@ -686,13 +657,11 @@ func (r *Registry) pollRequests(ctx context.Context) error { stmtFingerprint := string(*row[1].(*tree.DString)) var minExecutionLatency time.Duration var expiresAt time.Time - if isMinExecutionLatencySupported { - if minExecLatency, ok := row[2].(*tree.DInterval); ok { - minExecutionLatency = time.Duration(minExecLatency.Nanos()) - } - if e, ok := row[3].(*tree.DTimestampTZ); ok { - expiresAt = e.Time - } + if minExecLatency, ok := row[2].(*tree.DInterval); ok { + minExecutionLatency = time.Duration(minExecLatency.Nanos()) + } + if e, ok := row[3].(*tree.DTimestampTZ); ok { + expiresAt = e.Time } ids.Add(int(id)) r.addRequestInternalLocked(ctx, id, stmtFingerprint, minExecutionLatency, expiresAt)