From 9f694b9b70323d3fcf8b541e86a252ccc0e1a2ba Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 29 Apr 2018 19:40:29 +0200 Subject: [PATCH] sql: make PAUSE/RESUME/CANCEL JOB (->JOBS) support multiple jobs Prior to this patch, `CANCEL/PAUSE/RESUME JOB` would accept a single expression as argument. This patch extends the syntax to accept an arbitrary number of job IDs, via a selection query. The syntax becomes `CANCEL/PAUSE/RESUME JOBS `. The syntax `CANCEL/PAUSE/RESUME JOB` with a single expression is preserved for backward compatibility, and is desugared to `CANCEL/PAUSE/RESUME JOBS VALUES (...)` during parsing. Commands are processed atomically. If a job's state cannot be updated, an error occurs and all the commands are cancelled (i.e. every job remains in its initial state). In addition, the statement now returns the number of job commands processed. Finally, the amount of boilerplate in the AST nodes is reduced, by using a single `ControlJobs` node type instead of separate `ResumeJob` / `CancelJob` / `PauseJob`. Release note (sql change): The `CANCEL/PAUSE/RESUME JOB` statements are extended with variants `CANCEL/PAUSE/RESUME JOBS` able to operate on multiple jobs at once. For example, to pause all running jobs run `PAUSE JOBS SELECT id FROM [SHOW JOBS] WHERE status = 'running'`. --- docs/generated/sql/bnf/stmt_block.bnf | 7 +- pkg/sql/cancel_queries.go | 7 +- pkg/sql/control_job.go | 117 ------------------ pkg/sql/control_jobs.go | 112 +++++++++++++++++ pkg/sql/expand_plan.go | 8 +- .../logictest/testdata/logic_test/run_control | 33 ++++- pkg/sql/opt_filters.go | 6 +- pkg/sql/opt_limits.go | 4 +- pkg/sql/opt_needed.go | 4 +- pkg/sql/parser/help_test.go | 7 +- pkg/sql/parser/parse_test.go | 21 ++-- pkg/sql/parser/sql.y | 82 ++++++++---- pkg/sql/pgwire/pgwire_test.go | 18 +++ pkg/sql/plan.go | 17 +-- pkg/sql/sem/tree/run_control.go | 45 ++++--- pkg/sql/sem/tree/stmt.go | 24 ++-- pkg/sql/sem/tree/walk.go | 17 +++ pkg/sql/walk.go | 8 +- 18 files changed, 315 insertions(+), 222 deletions(-) delete mode 100644 pkg/sql/control_job.go create mode 100644 pkg/sql/control_jobs.go diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index cf781525a97d..65cced778866 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -47,7 +47,7 @@ backup_stmt ::= 'BACKUP' targets 'TO' string_or_placeholder opt_as_of_clause opt_incremental opt_with_options cancel_stmt ::= - cancel_job_stmt + cancel_jobs_stmt | cancel_queries_stmt | cancel_sessions_stmt @@ -102,6 +102,7 @@ import_stmt ::= pause_stmt ::= 'PAUSE' 'JOB' a_expr + | 'PAUSE' 'JOBS' select_stmt prepare_stmt ::= 'PREPARE' table_alias_name prep_type_clause 'AS' preparable_stmt @@ -112,6 +113,7 @@ restore_stmt ::= resume_stmt ::= 'RESUME' 'JOB' a_expr + | 'RESUME' 'JOBS' select_stmt revoke_stmt ::= 'REVOKE' privileges 'ON' targets 'FROM' name_list @@ -217,8 +219,9 @@ opt_with_options ::= | 'WITH' 'OPTIONS' '(' kv_option_list ')' | -cancel_job_stmt ::= +cancel_jobs_stmt ::= 'CANCEL' 'JOB' a_expr + | 'CANCEL' 'JOBS' select_stmt cancel_queries_stmt ::= 'CANCEL' 'QUERY' a_expr diff --git a/pkg/sql/cancel_queries.go b/pkg/sql/cancel_queries.go index 044ad6b37963..6b1813ad626e 100644 --- a/pkg/sql/cancel_queries.go +++ b/pkg/sql/cancel_queries.go @@ -58,8 +58,13 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) { return ok, err } + datum := n.rows.Values()[0] + if datum == tree.DNull { + return true, nil + } + statusServer := params.extendedEvalCtx.StatusServer - queryIDDatum := n.rows.Values()[0].(*tree.DString) + queryIDDatum := datum.(*tree.DString) queryIDString := string(*queryIDDatum) queryID, err := StringToClusterWideID(queryIDString) diff --git a/pkg/sql/control_job.go b/pkg/sql/control_job.go deleted file mode 100644 index 6405404fcbd9..000000000000 --- a/pkg/sql/control_job.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sql - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/sql/jobs" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sem/types" -) - -type controlJobNode struct { - jobID tree.TypedExpr - desiredStatus jobs.Status -} - -func (p *planner) PauseJob(ctx context.Context, n *tree.PauseJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "PAUSE JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusPaused, - }, nil -} - -func (p *planner) ResumeJob(ctx context.Context, n *tree.ResumeJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "RESUME JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusRunning, - }, nil -} - -func (p *planner) CancelJob(ctx context.Context, n *tree.CancelJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "CANCEL JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusCanceled, - }, nil -} - -func (n *controlJobNode) startExec(params runParams) error { - jobIDDatum, err := n.jobID.Eval(params.EvalContext()) - if err != nil { - return err - } - - jobID, ok := tree.AsDInt(jobIDDatum) - if !ok { - return fmt.Errorf("%s is not a valid job ID", jobIDDatum) - } - - reg := params.p.ExecCfg().JobRegistry - switch n.desiredStatus { - case jobs.StatusPaused: - return reg.Pause(params.ctx, params.p.txn, int64(jobID)) - case jobs.StatusRunning: - return reg.Resume(params.ctx, params.p.txn, int64(jobID)) - case jobs.StatusCanceled: - return reg.Cancel(params.ctx, params.p.txn, int64(jobID)) - default: - panic("unreachable") - } -} - -func (n *controlJobNode) Next(runParams) (bool, error) { return false, nil } -func (*controlJobNode) Values() tree.Datums { return nil } -func (*controlJobNode) Close(context.Context) {} diff --git a/pkg/sql/control_jobs.go b/pkg/sql/control_jobs.go new file mode 100644 index 000000000000..2e613f0f8510 --- /dev/null +++ b/pkg/sql/control_jobs.go @@ -0,0 +1,112 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sql + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/types" + "github.com/pkg/errors" +) + +type controlJobsNode struct { + rows planNode + desiredStatus jobs.Status + numRows int +} + +var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.Status{ + tree.CancelJob: jobs.StatusCanceled, + tree.ResumeJob: jobs.StatusRunning, + tree.PauseJob: jobs.StatusPaused, +} + +func (p *planner) ControlJobs(ctx context.Context, n *tree.ControlJobs) (planNode, error) { + rows, err := p.newPlan(ctx, n.Jobs, []types.T{types.Int}) + if err != nil { + return nil, err + } + cols := planColumns(rows) + if len(cols) != 1 { + return nil, errors.Errorf("%s JOBS expects a single column source, got %d columns", + tree.JobCommandToStatement[n.Command], len(cols)) + } + if !cols[0].Typ.Equivalent(types.Int) { + return nil, errors.Errorf("%s QUERIES requires int values, not type %s", + tree.JobCommandToStatement[n.Command], cols[0].Typ) + } + + return &controlJobsNode{ + rows: rows, + desiredStatus: jobCommandToDesiredStatus[n.Command], + }, nil +} + +// FastPathResults implements the planNodeFastPath inteface. +func (n *controlJobsNode) FastPathResults() (int, bool) { + return n.numRows, true +} + +// startExec implements the execStartable interface. +func (n *controlJobsNode) startExec(params runParams) error { + reg := params.p.ExecCfg().JobRegistry + for { + ok, err := n.rows.Next(params) + if err != nil { + return err + } + if !ok { + break + } + + datum := n.rows.Values()[0] + if datum == tree.DNull { + continue + } + + jobIDDatum := datum.(*tree.DInt) + jobID, ok := tree.AsDInt(jobIDDatum) + if !ok { + return fmt.Errorf("%s is not a valid job ID", jobIDDatum) + } + + switch n.desiredStatus { + case jobs.StatusPaused: + err = reg.Pause(params.ctx, params.p.txn, int64(jobID)) + case jobs.StatusRunning: + err = reg.Resume(params.ctx, params.p.txn, int64(jobID)) + case jobs.StatusCanceled: + err = reg.Cancel(params.ctx, params.p.txn, int64(jobID)) + default: + err = fmt.Errorf("programmer error: unhandled status %v", n.desiredStatus) + } + if err != nil { + return err + } + n.numRows++ + } + return nil +} + +func (*controlJobsNode) Next(runParams) (bool, error) { return false, nil } + +func (*controlJobsNode) Values() tree.Datums { return nil } + +func (n *controlJobsNode) Close(ctx context.Context) { + n.rows.Close(ctx) +} diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index f03c6fe55f77..c092f4eec71a 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -339,13 +339,15 @@ func doExpandPlan( case *cancelSessionsNode: n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *controlJobsNode: + n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: @@ -822,13 +824,15 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *cancelSessionsNode: n.rows = p.simplifyOrderings(n.rows, nil) + case *controlJobsNode: + n.rows = p.simplifyOrderings(n.rows, nil) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index e365139e1738..2ba686c60bb5 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -6,27 +6,54 @@ PAUSE JOB 1 query error could not parse "foo" as type int PAUSE JOB 'foo' -query error NULL is not a valid job ID +query error could not parse "foo" as type int +CANCEL JOBS SELECT 'foo' + +query error CANCEL JOBS expects a single column source, got 2 columns +CANCEL JOBS VALUES (1,2) + +statement ok count 0 PAUSE JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +PAUSE JOBS SELECT id FROM system.jobs LIMIT 0 + +query error could not parse "foo" as type int +PAUSE JOBS SELECT 'foo' + +query error PAUSE JOBS expects a single column source, got 2 columns +PAUSE JOBS VALUES (1,2) + query error job with ID 1 does not exist RESUME JOB 1 query error could not parse "foo" as type int RESUME JOB 'foo' -query error NULL is not a valid job ID +query error could not parse "foo" as type int +RESUME JOBS SELECT 'foo' + +query error RESUME JOBS expects a single column source, got 2 columns +RESUME JOBS VALUES (1,2) + +statement ok count 0 RESUME JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +RESUME JOBS SELECT id FROM system.jobs LIMIT 0 + query error job with ID 1 does not exist CANCEL JOB 1 query error could not parse "foo" as type int CANCEL JOB 'foo' -query error NULL is not a valid job ID +statement ok count 0 CANCEL JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +CANCEL JOBS SELECT id FROM system.jobs LIMIT 0 + query error CANCEL QUERIES requires string values, not type int CANCEL QUERY 1 diff --git a/pkg/sql/opt_filters.go b/pkg/sql/opt_filters.go index 50602b726fca..ec4e54c05c41 100644 --- a/pkg/sql/opt_filters.go +++ b/pkg/sql/opt_filters.go @@ -349,12 +349,16 @@ func (p *planner) propagateFilters( return plan, extraFilter, err } + case *controlJobsNode: + if n.rows, err = p.triggerFilterPropagation(ctx, n.rows); err != nil { + return plan, extraFilter, err + } + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index 14d82ba6b2f3..2fa099a2d19d 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -193,13 +193,15 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *cancelSessionsNode: p.setUnlimited(n.rows) + case *controlJobsNode: + p.setUnlimited(n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/opt_needed.go b/pkg/sql/opt_needed.go index bf7f4eae341b..e2bf42efcd19 100644 --- a/pkg/sql/opt_needed.go +++ b/pkg/sql/opt_needed.go @@ -209,11 +209,13 @@ func setNeededColumns(plan planNode, needed []bool) { case *cancelSessionsNode: setNeededColumns(n.rows, allColumns(n.rows)) + case *controlJobsNode: + setNeededColumns(n.rows, allColumns(n.rows)) + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *controlJobNode: case *scrubNode: case *createDatabaseNode: case *createIndexNode: diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 9af2e319e218..869510e24bee 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -71,7 +71,8 @@ func TestContextualHelp(t *testing.T) { {`ALTER USER foo WITH PASSWORD ??`, `ALTER USER`}, {`CANCEL ??`, `CANCEL`}, - {`CANCEL JOB ??`, `CANCEL JOB`}, + {`CANCEL JOB ??`, `CANCEL JOBS`}, + {`CANCEL JOBS ??`, `CANCEL JOBS`}, {`CANCEL QUERY ??`, `CANCEL QUERIES`}, {`CANCEL QUERY IF ??`, `CANCEL QUERIES`}, {`CANCEL QUERY IF EXISTS ??`, `CANCEL QUERIES`}, @@ -203,9 +204,9 @@ func TestContextualHelp(t *testing.T) { {`GRANT ALL ON foo TO ??`, `GRANT`}, {`GRANT ALL ON foo TO bar ??`, `GRANT`}, - {`PAUSE ??`, `PAUSE JOB`}, + {`PAUSE ??`, `PAUSE JOBS`}, - {`RESUME ??`, `RESUME JOB`}, + {`RESUME ??`, `RESUME JOBS`}, {`REVOKE ALL ??`, `REVOKE`}, {`REVOKE ALL ON foo FROM ??`, `REVOKE`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 6f26d0683488..9ecd5f3dbd42 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -310,13 +310,13 @@ func TestParse(t *testing.T) { {`DROP SEQUENCE a.b CASCADE`}, {`DROP SEQUENCE a, b CASCADE`}, - {`CANCEL JOB a`}, + {`CANCEL JOBS SELECT a`}, {`CANCEL QUERIES SELECT a`}, {`CANCEL SESSIONS SELECT a`}, {`CANCEL QUERIES IF EXISTS SELECT a`}, {`CANCEL SESSIONS IF EXISTS SELECT a`}, - {`RESUME JOB a`}, - {`PAUSE JOB a`}, + {`RESUME JOBS SELECT a`}, + {`PAUSE JOBS SELECT a`}, {`EXPLAIN SELECT 1`}, {`EXPLAIN EXPLAIN SELECT 1`}, @@ -423,12 +423,12 @@ func TestParse(t *testing.T) { {`PREPARE a (STRING) AS CANCEL SESSIONS SELECT $1`}, {`PREPARE a AS CANCEL SESSIONS IF EXISTS SELECT 1`}, {`PREPARE a (STRING) AS CANCEL SESSIONS IF EXISTS SELECT $1`}, - {`PREPARE a AS CANCEL JOB 1`}, - {`PREPARE a (INT) AS CANCEL JOB $1`}, - {`PREPARE a AS PAUSE JOB 1`}, - {`PREPARE a (INT) AS PAUSE JOB $1`}, - {`PREPARE a AS RESUME JOB 1`}, - {`PREPARE a (INT) AS RESUME JOB $1`}, + {`PREPARE a AS CANCEL JOBS SELECT 1`}, + {`PREPARE a (INT) AS CANCEL JOBS SELECT $1`}, + {`PREPARE a AS PAUSE JOBS SELECT 1`}, + {`PREPARE a (INT) AS PAUSE JOBS SELECT $1`}, + {`PREPARE a AS RESUME JOBS SELECT 1`}, + {`PREPARE a (INT) AS RESUME JOBS SELECT $1`}, {`PREPARE a AS IMPORT TABLE a CREATE USING 'b' CSV DATA ('c') WITH temp = 'd'`}, {`PREPARE a (STRING, STRING, STRING) AS IMPORT TABLE a CREATE USING $1 CSV DATA ($2) WITH temp = $3`}, @@ -1291,6 +1291,9 @@ func TestParse2(t *testing.T) { {`DEALLOCATE PREPARE ALL`, `DEALLOCATE ALL`}, + {`CANCEL JOB a`, `CANCEL JOBS VALUES (a)`}, + {`RESUME JOB a`, `RESUME JOBS VALUES (a)`}, + {`PAUSE JOB a`, `PAUSE JOBS VALUES (a)`}, {`CANCEL QUERY a`, `CANCEL QUERIES VALUES (a)`}, {`CANCEL QUERY IF EXISTS a`, `CANCEL QUERIES IF EXISTS VALUES (a)`}, {`CANCEL SESSION a`, `CANCEL SESSIONS VALUES (a)`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 58d440b4c78e..a135a6556490 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -605,7 +605,7 @@ func newNameFromStr(s string) *tree.Name { %type begin_stmt %type cancel_stmt -%type cancel_job_stmt +%type cancel_jobs_stmt %type cancel_queries_stmt %type cancel_sessions_stmt @@ -1041,10 +1041,10 @@ stmt: | grant_stmt // EXTEND WITH HELP: GRANT | insert_stmt // EXTEND WITH HELP: INSERT | import_stmt // EXTEND WITH HELP: IMPORT -| pause_stmt // EXTEND WITH HELP: PAUSE JOB +| pause_stmt // EXTEND WITH HELP: PAUSE JOBS | prepare_stmt // EXTEND WITH HELP: PREPARE | restore_stmt // EXTEND WITH HELP: RESTORE -| resume_stmt // EXTEND WITH HELP: RESUME JOB +| resume_stmt // EXTEND WITH HELP: RESUME JOBS | revoke_stmt // EXTEND WITH HELP: REVOKE | savepoint_stmt // EXTEND WITH HELP: SAVEPOINT | scrub_stmt // help texts in sub-rule @@ -1705,23 +1705,35 @@ copy_from_stmt: // %Help: CANCEL // %Category: Group -// %Text: CANCEL JOB, CANCEL QUERIES, CANCEL SESSIONS +// %Text: CANCEL JOBS, CANCEL QUERIES, CANCEL SESSIONS cancel_stmt: - cancel_job_stmt // EXTEND WITH HELP: CANCEL JOB + cancel_jobs_stmt // EXTEND WITH HELP: CANCEL JOBS | cancel_queries_stmt // EXTEND WITH HELP: CANCEL QUERIES | cancel_sessions_stmt // EXTEND WITH HELP: CANCEL SESSIONS | CANCEL error // SHOW HELP: CANCEL -// %Help: CANCEL JOB - cancel a background job +// %Help: CANCEL JOBS - cancel background jobs // %Category: Misc -// %Text: CANCEL JOB -// %SeeAlso: SHOW JOBS, PAUSE JOBS, RESUME JOB -cancel_job_stmt: +// %Text: +// CANCEL JOBS +// CANCEL JOB +// %SeeAlso: SHOW JOBS, PAUSE JOBS, RESUME JOBS +cancel_jobs_stmt: CANCEL JOB a_expr { - $$.val = &tree.CancelJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.CancelJob, + } } -| CANCEL JOB error // SHOW HELP: CANCEL JOB +| CANCEL JOB error // SHOW HELP: CANCEL JOBS +| CANCEL JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.CancelJob} + } +| CANCEL JOBS error // SHOW HELP: CANCEL JOBS // %Help: CANCEL QUERIES - cancel running queries // %Category: Misc @@ -2093,10 +2105,10 @@ preparable_stmt: | drop_user_stmt // EXTEND WITH HELP: DROP USER | import_stmt // EXTEND WITH HELP: IMPORT | insert_stmt // EXTEND WITH HELP: INSERT -| pause_stmt // EXTEND WITH HELP: PAUSE JOB +| pause_stmt // EXTEND WITH HELP: PAUSE JOBS | reset_stmt // help texts in sub-rule | restore_stmt // EXTEND WITH HELP: RESTORE -| resume_stmt // EXTEND WITH HELP: RESUME JOB +| resume_stmt // EXTEND WITH HELP: RESUME JOBS | select_stmt // help texts in sub-rule { $$.val = $1.slct() @@ -2909,7 +2921,7 @@ show_queries_stmt: // %Help: SHOW JOBS - list background jobs // %Category: Misc // %Text: SHOW JOBS -// %SeeAlso: CANCEL JOB, PAUSE JOB, RESUME JOB +// %SeeAlso: CANCEL JOBS, PAUSE JOBS, RESUME JOBS show_jobs_stmt: SHOW JOBS { @@ -3383,16 +3395,27 @@ for_grantee_clause: $$.val = tree.NameList(nil) } -// %Help: PAUSE JOB - pause a background job +// %Help: PAUSE JOBS - pause background jobs // %Category: Misc -// %Text: PAUSE JOB -// %SeeAlso: SHOW JOBS, CANCEL JOB, RESUME JOB +// %Text: +// PAUSE JOBS +// PAUSE JOB +// %SeeAlso: SHOW JOBS, CANCEL JOBS, RESUME JOBS pause_stmt: PAUSE JOB a_expr { - $$.val = &tree.PauseJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.PauseJob, + } + } +| PAUSE JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.PauseJob} } -| PAUSE error // SHOW HELP: PAUSE JOB +| PAUSE error // SHOW HELP: PAUSE JOBS // %Help: CREATE TABLE - create a new table // %Category: DDL @@ -4281,16 +4304,27 @@ release_stmt: } | RELEASE error // SHOW HELP: RELEASE -// %Help: RESUME JOB - resume a background job +// %Help: RESUME JOBS - resume background jobs // %Category: Misc -// %Text: RESUME JOB -// %SeeAlso: SHOW JOBS, CANCEL JOB, PAUSE JOB +// %Text: +// RESUME JOBS +// RESUME JOB +// %SeeAlso: SHOW JOBS, CANCEL JOBS, PAUSE JOBS resume_stmt: RESUME JOB a_expr { - $$.val = &tree.ResumeJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.ResumeJob, + } + } +| RESUME JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.ResumeJob} } -| RESUME error // SHOW HELP: RESUME JOB +| RESUME error // SHOW HELP: RESUME JOBS // %Help: SAVEPOINT - start a retryable block // %Category: Txn diff --git a/pkg/sql/pgwire/pgwire_test.go b/pkg/sql/pgwire/pgwire_test.go index c06ab80c63d4..6ff95dd7aeef 100644 --- a/pkg/sql/pgwire/pgwire_test.go +++ b/pkg/sql/pgwire/pgwire_test.go @@ -1236,18 +1236,36 @@ func TestPGPreparedExec(t *testing.T) { baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "CANCEL JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "RESUME JOB $1", []preparedExecTest{ baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "RESUME JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "PAUSE JOB $1", []preparedExecTest{ baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "PAUSE JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "CANCEL QUERY $1", []preparedExecTest{ diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 442b8da44055..dddcb8780561 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -214,6 +214,7 @@ var _ planNodeFastPath = &deleteNode{} var _ planNodeFastPath = &rowCountNode{} var _ planNodeFastPath = &serializeNode{} var _ planNodeFastPath = &setZoneConfigNode{} +var _ planNodeFastPath = &controlJobsNode{} // planNodeRequireSpool serves as marker for nodes whose parent must // ensure that the node is fully run to completion (and the results @@ -641,8 +642,8 @@ func (p *planner) newPlan( return p.CancelQueries(ctx, n) case *tree.CancelSessions: return p.CancelSessions(ctx, n) - case *tree.CancelJob: - return p.CancelJob(ctx, n) + case *tree.ControlJobs: + return p.ControlJobs(ctx, n) case *tree.Scrub: return p.Scrub(ctx, n) case *tree.CreateDatabase: @@ -687,8 +688,6 @@ func (p *planner) newPlan( return p.Insert(ctx, n, desiredTypes) case *tree.ParenSelect: return p.newPlan(ctx, n.Select, desiredTypes) - case *tree.PauseJob: - return p.PauseJob(ctx, n) case *tree.TestingRelocate: return p.TestingRelocate(ctx, n) case *tree.RenameColumn: @@ -699,8 +698,6 @@ func (p *planner) newPlan( return p.RenameIndex(ctx, n) case *tree.RenameTable: return p.RenameTable(ctx, n) - case *tree.ResumeJob: - return p.ResumeJob(ctx, n) case *tree.Revoke: return p.Revoke(ctx, n) case *tree.Scatter: @@ -824,8 +821,8 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.CancelQueries(ctx, n) case *tree.CancelSessions: return p.CancelSessions(ctx, n) - case *tree.CancelJob: - return p.CancelJob(ctx, n) + case *tree.ControlJobs: + return p.ControlJobs(ctx, n) case *tree.CreateUser: return p.CreateUser(ctx, n) case *tree.CreateTable: @@ -838,10 +835,6 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.Explain(ctx, n) case *tree.Insert: return p.Insert(ctx, n, nil) - case *tree.PauseJob: - return p.PauseJob(ctx, n) - case *tree.ResumeJob: - return p.ResumeJob(ctx, n) case *tree.Select: return p.Select(ctx, n, nil) case *tree.SelectClause: diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index 0893d4c00cac..59088eeb588e 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -14,37 +14,34 @@ package tree -// PauseJob represents a PAUSE JOB statement. -type PauseJob struct { - ID Expr +// ControlJobs represents a PAUSE/RESUME/CANCEL JOBS statement. +type ControlJobs struct { + Jobs *Select + Command JobCommand } -// Format implements the NodeFormatter interface. -func (node *PauseJob) Format(ctx *FmtCtx) { - ctx.WriteString("PAUSE JOB ") - ctx.FormatNode(node.ID) -} - -// ResumeJob represents a RESUME JOB statement. -type ResumeJob struct { - ID Expr -} +// JobCommand determines which type of action to effect on the selected job(s). +type JobCommand int -// Format implements the NodeFormatter interface. -func (node *ResumeJob) Format(ctx *FmtCtx) { - ctx.WriteString("RESUME JOB ") - ctx.FormatNode(node.ID) -} +// JobCommand values +const ( + PauseJob JobCommand = iota + CancelJob + ResumeJob +) -// CancelJob represents a CANCEL JOB statement. -type CancelJob struct { - ID Expr +// JobCommandToStatement translates a job command integer to a statement prefix. +var JobCommandToStatement = map[JobCommand]string{ + PauseJob: "PAUSE", + CancelJob: "CANCEL", + ResumeJob: "RESUME", } // Format implements the NodeFormatter interface. -func (node *CancelJob) Format(ctx *FmtCtx) { - ctx.WriteString("CANCEL JOB ") - ctx.FormatNode(node.ID) +func (n *ControlJobs) Format(ctx *FmtCtx) { + ctx.WriteString(JobCommandToStatement[n.Command]) + ctx.WriteString(" JOBS ") + ctx.FormatNode(n.Jobs) } // CancelQueries represents a CANCEL QUERIES statement. diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 013f3d550d67..823fb77d004d 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -207,10 +207,14 @@ func (*BeginTransaction) StatementTag() string { return "BEGIN" } func (*BeginTransaction) hiddenFromStats() {} // StatementType implements the Statement interface. -func (*CancelJob) StatementType() StatementType { return Ack } +func (*ControlJobs) StatementType() StatementType { return RowsAffected } // StatementTag returns a short string identifying the type of statement. -func (*CancelJob) StatementTag() string { return "CANCEL JOB" } +func (n *ControlJobs) StatementTag() string { + return fmt.Sprintf("%s JOBS", JobCommandToStatement[n.Command]) +} + +func (*ControlJobs) independentFromParallelizedPriors() {} // StatementType implements the Statement interface. func (*CancelQueries) StatementType() StatementType { return RowsAffected } @@ -435,12 +439,6 @@ func (*ParenSelect) StatementType() StatementType { return Rows } // StatementTag returns a short string identifying the type of statement. func (*ParenSelect) StatementTag() string { return "SELECT" } -// StatementType implements the Statement interface. -func (*PauseJob) StatementType() StatementType { return Ack } - -// StatementTag returns a short string identifying the type of statement. -func (*PauseJob) StatementTag() string { return "PAUSE JOB" } - // StatementType implements the Statement interface. func (*Prepare) StatementType() StatementType { return Ack } @@ -502,12 +500,6 @@ func (*Restore) StatementTag() string { return "RESTORE" } func (*Restore) hiddenFromShowQueries() {} -// StatementType implements the Statement interface. -func (*ResumeJob) StatementType() StatementType { return Ack } - -// StatementTag returns a short string identifying the type of statement. -func (*ResumeJob) StatementTag() string { return "RESUME JOB" } - // StatementType implements the Statement interface. func (*Revoke) StatementType() StatementType { return DDL } @@ -888,7 +880,7 @@ func (n *AlterUserSetPassword) String() string { return AsString(n) } func (n *AlterSequence) String() string { return AsString(n) } func (n *Backup) String() string { return AsString(n) } func (n *BeginTransaction) String() string { return AsString(n) } -func (n *CancelJob) String() string { return AsString(n) } +func (n *ControlJobs) String() string { return AsString(n) } func (n *CancelQueries) String() string { return AsString(n) } func (n *CancelSessions) String() string { return AsString(n) } func (n *CommitTransaction) String() string { return AsString(n) } @@ -919,7 +911,6 @@ func (n *GrantRole) String() string { return AsString(n) } func (n *Insert) String() string { return AsString(n) } func (n *Import) String() string { return AsString(n) } func (n *ParenSelect) String() string { return AsString(n) } -func (n *PauseJob) String() string { return AsString(n) } func (n *Prepare) String() string { return AsString(n) } func (n *ReleaseSavepoint) String() string { return AsString(n) } func (n *TestingRelocate) String() string { return AsString(n) } @@ -928,7 +919,6 @@ func (n *RenameDatabase) String() string { return AsString(n) } func (n *RenameIndex) String() string { return AsString(n) } func (n *RenameTable) String() string { return AsString(n) } func (n *Restore) String() string { return AsString(n) } -func (n *ResumeJob) String() string { return AsString(n) } func (n *Revoke) String() string { return AsString(n) } func (n *RevokeRole) String() string { return AsString(n) } func (n *RollbackToSavepoint) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index d9a795267374..bedfb191a093 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -815,6 +815,22 @@ func (stmt *CancelSessions) WalkStmt(v Visitor) Statement { return stmt } +// CopyNode makes a copy of this Statement without recursing in any child Statements. +func (stmt *ControlJobs) CopyNode() *ControlJobs { + stmtCopy := *stmt + return &stmtCopy +} + +// WalkStmt is part of the WalkableStmt interface. +func (stmt *ControlJobs) WalkStmt(v Visitor) Statement { + sel, changed := WalkStmt(v, stmt.Jobs) + if changed { + stmt = stmt.CopyNode() + stmt.Jobs = sel.(*Select) + } + return stmt +} + // CopyNode makes a copy of this Statement without recursing in any child Statements. func (stmt *Import) CopyNode() *Import { stmtCopy := *stmt @@ -1195,6 +1211,7 @@ var _ WalkableStmt = &Update{} var _ WalkableStmt = &ValuesClause{} var _ WalkableStmt = &CancelQueries{} var _ WalkableStmt = &CancelSessions{} +var _ WalkableStmt = &ControlJobs{} // WalkStmt walks the entire parsed stmt calling WalkExpr on each // expression, and replacing each expression with the one returned diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index fe7be8807dff..5b92ca498641 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -507,10 +507,8 @@ func (v *planVisitor) visit(plan planNode) { case *cancelSessionsNode: v.visit(n.rows) - case *controlJobNode: - if v.observer.expr != nil { - v.expr(name, "jobID", -1, n.jobID) - } + case *controlJobsNode: + v.visit(n.rows) } } @@ -562,7 +560,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user", reflect.TypeOf(&cancelQueriesNode{}): "cancel queries", reflect.TypeOf(&cancelSessionsNode{}): "cancel sessions", - reflect.TypeOf(&controlJobNode{}): "control job", + reflect.TypeOf(&controlJobsNode{}): "control jobs", reflect.TypeOf(&createDatabaseNode{}): "create database", reflect.TypeOf(&createIndexNode{}): "create index", reflect.TypeOf(&createTableNode{}): "create table",