diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index cf781525a97d..65cced778866 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -47,7 +47,7 @@ backup_stmt ::= 'BACKUP' targets 'TO' string_or_placeholder opt_as_of_clause opt_incremental opt_with_options cancel_stmt ::= - cancel_job_stmt + cancel_jobs_stmt | cancel_queries_stmt | cancel_sessions_stmt @@ -102,6 +102,7 @@ import_stmt ::= pause_stmt ::= 'PAUSE' 'JOB' a_expr + | 'PAUSE' 'JOBS' select_stmt prepare_stmt ::= 'PREPARE' table_alias_name prep_type_clause 'AS' preparable_stmt @@ -112,6 +113,7 @@ restore_stmt ::= resume_stmt ::= 'RESUME' 'JOB' a_expr + | 'RESUME' 'JOBS' select_stmt revoke_stmt ::= 'REVOKE' privileges 'ON' targets 'FROM' name_list @@ -217,8 +219,9 @@ opt_with_options ::= | 'WITH' 'OPTIONS' '(' kv_option_list ')' | -cancel_job_stmt ::= +cancel_jobs_stmt ::= 'CANCEL' 'JOB' a_expr + | 'CANCEL' 'JOBS' select_stmt cancel_queries_stmt ::= 'CANCEL' 'QUERY' a_expr diff --git a/pkg/sql/cancel_queries.go b/pkg/sql/cancel_queries.go index 044ad6b37963..6b1813ad626e 100644 --- a/pkg/sql/cancel_queries.go +++ b/pkg/sql/cancel_queries.go @@ -58,8 +58,13 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) { return ok, err } + datum := n.rows.Values()[0] + if datum == tree.DNull { + return true, nil + } + statusServer := params.extendedEvalCtx.StatusServer - queryIDDatum := n.rows.Values()[0].(*tree.DString) + queryIDDatum := datum.(*tree.DString) queryIDString := string(*queryIDDatum) queryID, err := StringToClusterWideID(queryIDString) diff --git a/pkg/sql/control_job.go b/pkg/sql/control_job.go deleted file mode 100644 index 6405404fcbd9..000000000000 --- a/pkg/sql/control_job.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sql - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/sql/jobs" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sem/types" -) - -type controlJobNode struct { - jobID tree.TypedExpr - desiredStatus jobs.Status -} - -func (p *planner) PauseJob(ctx context.Context, n *tree.PauseJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "PAUSE JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusPaused, - }, nil -} - -func (p *planner) ResumeJob(ctx context.Context, n *tree.ResumeJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "RESUME JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusRunning, - }, nil -} - -func (p *planner) CancelJob(ctx context.Context, n *tree.CancelJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "CANCEL JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusCanceled, - }, nil -} - -func (n *controlJobNode) startExec(params runParams) error { - jobIDDatum, err := n.jobID.Eval(params.EvalContext()) - if err != nil { - return err - } - - jobID, ok := tree.AsDInt(jobIDDatum) - if !ok { - return fmt.Errorf("%s is not a valid job ID", jobIDDatum) - } - - reg := params.p.ExecCfg().JobRegistry - switch n.desiredStatus { - case jobs.StatusPaused: - return reg.Pause(params.ctx, params.p.txn, int64(jobID)) - case jobs.StatusRunning: - return reg.Resume(params.ctx, params.p.txn, int64(jobID)) - case jobs.StatusCanceled: - return reg.Cancel(params.ctx, params.p.txn, int64(jobID)) - default: - panic("unreachable") - } -} - -func (n *controlJobNode) Next(runParams) (bool, error) { return false, nil } -func (*controlJobNode) Values() tree.Datums { return nil } -func (*controlJobNode) Close(context.Context) {} diff --git a/pkg/sql/control_jobs.go b/pkg/sql/control_jobs.go new file mode 100644 index 000000000000..2e613f0f8510 --- /dev/null +++ b/pkg/sql/control_jobs.go @@ -0,0 +1,112 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sql + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/types" + "github.com/pkg/errors" +) + +type controlJobsNode struct { + rows planNode + desiredStatus jobs.Status + numRows int +} + +var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.Status{ + tree.CancelJob: jobs.StatusCanceled, + tree.ResumeJob: jobs.StatusRunning, + tree.PauseJob: jobs.StatusPaused, +} + +func (p *planner) ControlJobs(ctx context.Context, n *tree.ControlJobs) (planNode, error) { + rows, err := p.newPlan(ctx, n.Jobs, []types.T{types.Int}) + if err != nil { + return nil, err + } + cols := planColumns(rows) + if len(cols) != 1 { + return nil, errors.Errorf("%s JOBS expects a single column source, got %d columns", + tree.JobCommandToStatement[n.Command], len(cols)) + } + if !cols[0].Typ.Equivalent(types.Int) { + return nil, errors.Errorf("%s QUERIES requires int values, not type %s", + tree.JobCommandToStatement[n.Command], cols[0].Typ) + } + + return &controlJobsNode{ + rows: rows, + desiredStatus: jobCommandToDesiredStatus[n.Command], + }, nil +} + +// FastPathResults implements the planNodeFastPath inteface. +func (n *controlJobsNode) FastPathResults() (int, bool) { + return n.numRows, true +} + +// startExec implements the execStartable interface. +func (n *controlJobsNode) startExec(params runParams) error { + reg := params.p.ExecCfg().JobRegistry + for { + ok, err := n.rows.Next(params) + if err != nil { + return err + } + if !ok { + break + } + + datum := n.rows.Values()[0] + if datum == tree.DNull { + continue + } + + jobIDDatum := datum.(*tree.DInt) + jobID, ok := tree.AsDInt(jobIDDatum) + if !ok { + return fmt.Errorf("%s is not a valid job ID", jobIDDatum) + } + + switch n.desiredStatus { + case jobs.StatusPaused: + err = reg.Pause(params.ctx, params.p.txn, int64(jobID)) + case jobs.StatusRunning: + err = reg.Resume(params.ctx, params.p.txn, int64(jobID)) + case jobs.StatusCanceled: + err = reg.Cancel(params.ctx, params.p.txn, int64(jobID)) + default: + err = fmt.Errorf("programmer error: unhandled status %v", n.desiredStatus) + } + if err != nil { + return err + } + n.numRows++ + } + return nil +} + +func (*controlJobsNode) Next(runParams) (bool, error) { return false, nil } + +func (*controlJobsNode) Values() tree.Datums { return nil } + +func (n *controlJobsNode) Close(ctx context.Context) { + n.rows.Close(ctx) +} diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index f03c6fe55f77..c092f4eec71a 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -339,13 +339,15 @@ func doExpandPlan( case *cancelSessionsNode: n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *controlJobsNode: + n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: @@ -822,13 +824,15 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *cancelSessionsNode: n.rows = p.simplifyOrderings(n.rows, nil) + case *controlJobsNode: + n.rows = p.simplifyOrderings(n.rows, nil) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index e365139e1738..2ba686c60bb5 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -6,27 +6,54 @@ PAUSE JOB 1 query error could not parse "foo" as type int PAUSE JOB 'foo' -query error NULL is not a valid job ID +query error could not parse "foo" as type int +CANCEL JOBS SELECT 'foo' + +query error CANCEL JOBS expects a single column source, got 2 columns +CANCEL JOBS VALUES (1,2) + +statement ok count 0 PAUSE JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +PAUSE JOBS SELECT id FROM system.jobs LIMIT 0 + +query error could not parse "foo" as type int +PAUSE JOBS SELECT 'foo' + +query error PAUSE JOBS expects a single column source, got 2 columns +PAUSE JOBS VALUES (1,2) + query error job with ID 1 does not exist RESUME JOB 1 query error could not parse "foo" as type int RESUME JOB 'foo' -query error NULL is not a valid job ID +query error could not parse "foo" as type int +RESUME JOBS SELECT 'foo' + +query error RESUME JOBS expects a single column source, got 2 columns +RESUME JOBS VALUES (1,2) + +statement ok count 0 RESUME JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +RESUME JOBS SELECT id FROM system.jobs LIMIT 0 + query error job with ID 1 does not exist CANCEL JOB 1 query error could not parse "foo" as type int CANCEL JOB 'foo' -query error NULL is not a valid job ID +statement ok count 0 CANCEL JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +CANCEL JOBS SELECT id FROM system.jobs LIMIT 0 + query error CANCEL QUERIES requires string values, not type int CANCEL QUERY 1 diff --git a/pkg/sql/opt_filters.go b/pkg/sql/opt_filters.go index 50602b726fca..ec4e54c05c41 100644 --- a/pkg/sql/opt_filters.go +++ b/pkg/sql/opt_filters.go @@ -349,12 +349,16 @@ func (p *planner) propagateFilters( return plan, extraFilter, err } + case *controlJobsNode: + if n.rows, err = p.triggerFilterPropagation(ctx, n.rows); err != nil { + return plan, extraFilter, err + } + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index 14d82ba6b2f3..2fa099a2d19d 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -193,13 +193,15 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *cancelSessionsNode: p.setUnlimited(n.rows) + case *controlJobsNode: + p.setUnlimited(n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/opt_needed.go b/pkg/sql/opt_needed.go index bf7f4eae341b..e2bf42efcd19 100644 --- a/pkg/sql/opt_needed.go +++ b/pkg/sql/opt_needed.go @@ -209,11 +209,13 @@ func setNeededColumns(plan planNode, needed []bool) { case *cancelSessionsNode: setNeededColumns(n.rows, allColumns(n.rows)) + case *controlJobsNode: + setNeededColumns(n.rows, allColumns(n.rows)) + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *controlJobNode: case *scrubNode: case *createDatabaseNode: case *createIndexNode: diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 9af2e319e218..869510e24bee 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -71,7 +71,8 @@ func TestContextualHelp(t *testing.T) { {`ALTER USER foo WITH PASSWORD ??`, `ALTER USER`}, {`CANCEL ??`, `CANCEL`}, - {`CANCEL JOB ??`, `CANCEL JOB`}, + {`CANCEL JOB ??`, `CANCEL JOBS`}, + {`CANCEL JOBS ??`, `CANCEL JOBS`}, {`CANCEL QUERY ??`, `CANCEL QUERIES`}, {`CANCEL QUERY IF ??`, `CANCEL QUERIES`}, {`CANCEL QUERY IF EXISTS ??`, `CANCEL QUERIES`}, @@ -203,9 +204,9 @@ func TestContextualHelp(t *testing.T) { {`GRANT ALL ON foo TO ??`, `GRANT`}, {`GRANT ALL ON foo TO bar ??`, `GRANT`}, - {`PAUSE ??`, `PAUSE JOB`}, + {`PAUSE ??`, `PAUSE JOBS`}, - {`RESUME ??`, `RESUME JOB`}, + {`RESUME ??`, `RESUME JOBS`}, {`REVOKE ALL ??`, `REVOKE`}, {`REVOKE ALL ON foo FROM ??`, `REVOKE`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 6f26d0683488..9ecd5f3dbd42 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -310,13 +310,13 @@ func TestParse(t *testing.T) { {`DROP SEQUENCE a.b CASCADE`}, {`DROP SEQUENCE a, b CASCADE`}, - {`CANCEL JOB a`}, + {`CANCEL JOBS SELECT a`}, {`CANCEL QUERIES SELECT a`}, {`CANCEL SESSIONS SELECT a`}, {`CANCEL QUERIES IF EXISTS SELECT a`}, {`CANCEL SESSIONS IF EXISTS SELECT a`}, - {`RESUME JOB a`}, - {`PAUSE JOB a`}, + {`RESUME JOBS SELECT a`}, + {`PAUSE JOBS SELECT a`}, {`EXPLAIN SELECT 1`}, {`EXPLAIN EXPLAIN SELECT 1`}, @@ -423,12 +423,12 @@ func TestParse(t *testing.T) { {`PREPARE a (STRING) AS CANCEL SESSIONS SELECT $1`}, {`PREPARE a AS CANCEL SESSIONS IF EXISTS SELECT 1`}, {`PREPARE a (STRING) AS CANCEL SESSIONS IF EXISTS SELECT $1`}, - {`PREPARE a AS CANCEL JOB 1`}, - {`PREPARE a (INT) AS CANCEL JOB $1`}, - {`PREPARE a AS PAUSE JOB 1`}, - {`PREPARE a (INT) AS PAUSE JOB $1`}, - {`PREPARE a AS RESUME JOB 1`}, - {`PREPARE a (INT) AS RESUME JOB $1`}, + {`PREPARE a AS CANCEL JOBS SELECT 1`}, + {`PREPARE a (INT) AS CANCEL JOBS SELECT $1`}, + {`PREPARE a AS PAUSE JOBS SELECT 1`}, + {`PREPARE a (INT) AS PAUSE JOBS SELECT $1`}, + {`PREPARE a AS RESUME JOBS SELECT 1`}, + {`PREPARE a (INT) AS RESUME JOBS SELECT $1`}, {`PREPARE a AS IMPORT TABLE a CREATE USING 'b' CSV DATA ('c') WITH temp = 'd'`}, {`PREPARE a (STRING, STRING, STRING) AS IMPORT TABLE a CREATE USING $1 CSV DATA ($2) WITH temp = $3`}, @@ -1291,6 +1291,9 @@ func TestParse2(t *testing.T) { {`DEALLOCATE PREPARE ALL`, `DEALLOCATE ALL`}, + {`CANCEL JOB a`, `CANCEL JOBS VALUES (a)`}, + {`RESUME JOB a`, `RESUME JOBS VALUES (a)`}, + {`PAUSE JOB a`, `PAUSE JOBS VALUES (a)`}, {`CANCEL QUERY a`, `CANCEL QUERIES VALUES (a)`}, {`CANCEL QUERY IF EXISTS a`, `CANCEL QUERIES IF EXISTS VALUES (a)`}, {`CANCEL SESSION a`, `CANCEL SESSIONS VALUES (a)`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 58d440b4c78e..a135a6556490 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -605,7 +605,7 @@ func newNameFromStr(s string) *tree.Name { %type begin_stmt %type cancel_stmt -%type cancel_job_stmt +%type cancel_jobs_stmt %type cancel_queries_stmt %type cancel_sessions_stmt @@ -1041,10 +1041,10 @@ stmt: | grant_stmt // EXTEND WITH HELP: GRANT | insert_stmt // EXTEND WITH HELP: INSERT | import_stmt // EXTEND WITH HELP: IMPORT -| pause_stmt // EXTEND WITH HELP: PAUSE JOB +| pause_stmt // EXTEND WITH HELP: PAUSE JOBS | prepare_stmt // EXTEND WITH HELP: PREPARE | restore_stmt // EXTEND WITH HELP: RESTORE -| resume_stmt // EXTEND WITH HELP: RESUME JOB +| resume_stmt // EXTEND WITH HELP: RESUME JOBS | revoke_stmt // EXTEND WITH HELP: REVOKE | savepoint_stmt // EXTEND WITH HELP: SAVEPOINT | scrub_stmt // help texts in sub-rule @@ -1705,23 +1705,35 @@ copy_from_stmt: // %Help: CANCEL // %Category: Group -// %Text: CANCEL JOB, CANCEL QUERIES, CANCEL SESSIONS +// %Text: CANCEL JOBS, CANCEL QUERIES, CANCEL SESSIONS cancel_stmt: - cancel_job_stmt // EXTEND WITH HELP: CANCEL JOB + cancel_jobs_stmt // EXTEND WITH HELP: CANCEL JOBS | cancel_queries_stmt // EXTEND WITH HELP: CANCEL QUERIES | cancel_sessions_stmt // EXTEND WITH HELP: CANCEL SESSIONS | CANCEL error // SHOW HELP: CANCEL -// %Help: CANCEL JOB - cancel a background job +// %Help: CANCEL JOBS - cancel background jobs // %Category: Misc -// %Text: CANCEL JOB -// %SeeAlso: SHOW JOBS, PAUSE JOBS, RESUME JOB -cancel_job_stmt: +// %Text: +// CANCEL JOBS +// CANCEL JOB +// %SeeAlso: SHOW JOBS, PAUSE JOBS, RESUME JOBS +cancel_jobs_stmt: CANCEL JOB a_expr { - $$.val = &tree.CancelJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.CancelJob, + } } -| CANCEL JOB error // SHOW HELP: CANCEL JOB +| CANCEL JOB error // SHOW HELP: CANCEL JOBS +| CANCEL JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.CancelJob} + } +| CANCEL JOBS error // SHOW HELP: CANCEL JOBS // %Help: CANCEL QUERIES - cancel running queries // %Category: Misc @@ -2093,10 +2105,10 @@ preparable_stmt: | drop_user_stmt // EXTEND WITH HELP: DROP USER | import_stmt // EXTEND WITH HELP: IMPORT | insert_stmt // EXTEND WITH HELP: INSERT -| pause_stmt // EXTEND WITH HELP: PAUSE JOB +| pause_stmt // EXTEND WITH HELP: PAUSE JOBS | reset_stmt // help texts in sub-rule | restore_stmt // EXTEND WITH HELP: RESTORE -| resume_stmt // EXTEND WITH HELP: RESUME JOB +| resume_stmt // EXTEND WITH HELP: RESUME JOBS | select_stmt // help texts in sub-rule { $$.val = $1.slct() @@ -2909,7 +2921,7 @@ show_queries_stmt: // %Help: SHOW JOBS - list background jobs // %Category: Misc // %Text: SHOW JOBS -// %SeeAlso: CANCEL JOB, PAUSE JOB, RESUME JOB +// %SeeAlso: CANCEL JOBS, PAUSE JOBS, RESUME JOBS show_jobs_stmt: SHOW JOBS { @@ -3383,16 +3395,27 @@ for_grantee_clause: $$.val = tree.NameList(nil) } -// %Help: PAUSE JOB - pause a background job +// %Help: PAUSE JOBS - pause background jobs // %Category: Misc -// %Text: PAUSE JOB -// %SeeAlso: SHOW JOBS, CANCEL JOB, RESUME JOB +// %Text: +// PAUSE JOBS +// PAUSE JOB +// %SeeAlso: SHOW JOBS, CANCEL JOBS, RESUME JOBS pause_stmt: PAUSE JOB a_expr { - $$.val = &tree.PauseJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.PauseJob, + } + } +| PAUSE JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.PauseJob} } -| PAUSE error // SHOW HELP: PAUSE JOB +| PAUSE error // SHOW HELP: PAUSE JOBS // %Help: CREATE TABLE - create a new table // %Category: DDL @@ -4281,16 +4304,27 @@ release_stmt: } | RELEASE error // SHOW HELP: RELEASE -// %Help: RESUME JOB - resume a background job +// %Help: RESUME JOBS - resume background jobs // %Category: Misc -// %Text: RESUME JOB -// %SeeAlso: SHOW JOBS, CANCEL JOB, PAUSE JOB +// %Text: +// RESUME JOBS +// RESUME JOB +// %SeeAlso: SHOW JOBS, CANCEL JOBS, PAUSE JOBS resume_stmt: RESUME JOB a_expr { - $$.val = &tree.ResumeJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.ResumeJob, + } + } +| RESUME JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.ResumeJob} } -| RESUME error // SHOW HELP: RESUME JOB +| RESUME error // SHOW HELP: RESUME JOBS // %Help: SAVEPOINT - start a retryable block // %Category: Txn diff --git a/pkg/sql/pgwire/pgwire_test.go b/pkg/sql/pgwire/pgwire_test.go index c06ab80c63d4..6ff95dd7aeef 100644 --- a/pkg/sql/pgwire/pgwire_test.go +++ b/pkg/sql/pgwire/pgwire_test.go @@ -1236,18 +1236,36 @@ func TestPGPreparedExec(t *testing.T) { baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "CANCEL JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "RESUME JOB $1", []preparedExecTest{ baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "RESUME JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "PAUSE JOB $1", []preparedExecTest{ baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "PAUSE JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "CANCEL QUERY $1", []preparedExecTest{ diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 442b8da44055..dddcb8780561 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -214,6 +214,7 @@ var _ planNodeFastPath = &deleteNode{} var _ planNodeFastPath = &rowCountNode{} var _ planNodeFastPath = &serializeNode{} var _ planNodeFastPath = &setZoneConfigNode{} +var _ planNodeFastPath = &controlJobsNode{} // planNodeRequireSpool serves as marker for nodes whose parent must // ensure that the node is fully run to completion (and the results @@ -641,8 +642,8 @@ func (p *planner) newPlan( return p.CancelQueries(ctx, n) case *tree.CancelSessions: return p.CancelSessions(ctx, n) - case *tree.CancelJob: - return p.CancelJob(ctx, n) + case *tree.ControlJobs: + return p.ControlJobs(ctx, n) case *tree.Scrub: return p.Scrub(ctx, n) case *tree.CreateDatabase: @@ -687,8 +688,6 @@ func (p *planner) newPlan( return p.Insert(ctx, n, desiredTypes) case *tree.ParenSelect: return p.newPlan(ctx, n.Select, desiredTypes) - case *tree.PauseJob: - return p.PauseJob(ctx, n) case *tree.TestingRelocate: return p.TestingRelocate(ctx, n) case *tree.RenameColumn: @@ -699,8 +698,6 @@ func (p *planner) newPlan( return p.RenameIndex(ctx, n) case *tree.RenameTable: return p.RenameTable(ctx, n) - case *tree.ResumeJob: - return p.ResumeJob(ctx, n) case *tree.Revoke: return p.Revoke(ctx, n) case *tree.Scatter: @@ -824,8 +821,8 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.CancelQueries(ctx, n) case *tree.CancelSessions: return p.CancelSessions(ctx, n) - case *tree.CancelJob: - return p.CancelJob(ctx, n) + case *tree.ControlJobs: + return p.ControlJobs(ctx, n) case *tree.CreateUser: return p.CreateUser(ctx, n) case *tree.CreateTable: @@ -838,10 +835,6 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.Explain(ctx, n) case *tree.Insert: return p.Insert(ctx, n, nil) - case *tree.PauseJob: - return p.PauseJob(ctx, n) - case *tree.ResumeJob: - return p.ResumeJob(ctx, n) case *tree.Select: return p.Select(ctx, n, nil) case *tree.SelectClause: diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index 0893d4c00cac..59088eeb588e 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -14,37 +14,34 @@ package tree -// PauseJob represents a PAUSE JOB statement. -type PauseJob struct { - ID Expr +// ControlJobs represents a PAUSE/RESUME/CANCEL JOBS statement. +type ControlJobs struct { + Jobs *Select + Command JobCommand } -// Format implements the NodeFormatter interface. -func (node *PauseJob) Format(ctx *FmtCtx) { - ctx.WriteString("PAUSE JOB ") - ctx.FormatNode(node.ID) -} - -// ResumeJob represents a RESUME JOB statement. -type ResumeJob struct { - ID Expr -} +// JobCommand determines which type of action to effect on the selected job(s). +type JobCommand int -// Format implements the NodeFormatter interface. -func (node *ResumeJob) Format(ctx *FmtCtx) { - ctx.WriteString("RESUME JOB ") - ctx.FormatNode(node.ID) -} +// JobCommand values +const ( + PauseJob JobCommand = iota + CancelJob + ResumeJob +) -// CancelJob represents a CANCEL JOB statement. -type CancelJob struct { - ID Expr +// JobCommandToStatement translates a job command integer to a statement prefix. +var JobCommandToStatement = map[JobCommand]string{ + PauseJob: "PAUSE", + CancelJob: "CANCEL", + ResumeJob: "RESUME", } // Format implements the NodeFormatter interface. -func (node *CancelJob) Format(ctx *FmtCtx) { - ctx.WriteString("CANCEL JOB ") - ctx.FormatNode(node.ID) +func (n *ControlJobs) Format(ctx *FmtCtx) { + ctx.WriteString(JobCommandToStatement[n.Command]) + ctx.WriteString(" JOBS ") + ctx.FormatNode(n.Jobs) } // CancelQueries represents a CANCEL QUERIES statement. diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 013f3d550d67..823fb77d004d 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -207,10 +207,14 @@ func (*BeginTransaction) StatementTag() string { return "BEGIN" } func (*BeginTransaction) hiddenFromStats() {} // StatementType implements the Statement interface. -func (*CancelJob) StatementType() StatementType { return Ack } +func (*ControlJobs) StatementType() StatementType { return RowsAffected } // StatementTag returns a short string identifying the type of statement. -func (*CancelJob) StatementTag() string { return "CANCEL JOB" } +func (n *ControlJobs) StatementTag() string { + return fmt.Sprintf("%s JOBS", JobCommandToStatement[n.Command]) +} + +func (*ControlJobs) independentFromParallelizedPriors() {} // StatementType implements the Statement interface. func (*CancelQueries) StatementType() StatementType { return RowsAffected } @@ -435,12 +439,6 @@ func (*ParenSelect) StatementType() StatementType { return Rows } // StatementTag returns a short string identifying the type of statement. func (*ParenSelect) StatementTag() string { return "SELECT" } -// StatementType implements the Statement interface. -func (*PauseJob) StatementType() StatementType { return Ack } - -// StatementTag returns a short string identifying the type of statement. -func (*PauseJob) StatementTag() string { return "PAUSE JOB" } - // StatementType implements the Statement interface. func (*Prepare) StatementType() StatementType { return Ack } @@ -502,12 +500,6 @@ func (*Restore) StatementTag() string { return "RESTORE" } func (*Restore) hiddenFromShowQueries() {} -// StatementType implements the Statement interface. -func (*ResumeJob) StatementType() StatementType { return Ack } - -// StatementTag returns a short string identifying the type of statement. -func (*ResumeJob) StatementTag() string { return "RESUME JOB" } - // StatementType implements the Statement interface. func (*Revoke) StatementType() StatementType { return DDL } @@ -888,7 +880,7 @@ func (n *AlterUserSetPassword) String() string { return AsString(n) } func (n *AlterSequence) String() string { return AsString(n) } func (n *Backup) String() string { return AsString(n) } func (n *BeginTransaction) String() string { return AsString(n) } -func (n *CancelJob) String() string { return AsString(n) } +func (n *ControlJobs) String() string { return AsString(n) } func (n *CancelQueries) String() string { return AsString(n) } func (n *CancelSessions) String() string { return AsString(n) } func (n *CommitTransaction) String() string { return AsString(n) } @@ -919,7 +911,6 @@ func (n *GrantRole) String() string { return AsString(n) } func (n *Insert) String() string { return AsString(n) } func (n *Import) String() string { return AsString(n) } func (n *ParenSelect) String() string { return AsString(n) } -func (n *PauseJob) String() string { return AsString(n) } func (n *Prepare) String() string { return AsString(n) } func (n *ReleaseSavepoint) String() string { return AsString(n) } func (n *TestingRelocate) String() string { return AsString(n) } @@ -928,7 +919,6 @@ func (n *RenameDatabase) String() string { return AsString(n) } func (n *RenameIndex) String() string { return AsString(n) } func (n *RenameTable) String() string { return AsString(n) } func (n *Restore) String() string { return AsString(n) } -func (n *ResumeJob) String() string { return AsString(n) } func (n *Revoke) String() string { return AsString(n) } func (n *RevokeRole) String() string { return AsString(n) } func (n *RollbackToSavepoint) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index d9a795267374..bedfb191a093 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -815,6 +815,22 @@ func (stmt *CancelSessions) WalkStmt(v Visitor) Statement { return stmt } +// CopyNode makes a copy of this Statement without recursing in any child Statements. +func (stmt *ControlJobs) CopyNode() *ControlJobs { + stmtCopy := *stmt + return &stmtCopy +} + +// WalkStmt is part of the WalkableStmt interface. +func (stmt *ControlJobs) WalkStmt(v Visitor) Statement { + sel, changed := WalkStmt(v, stmt.Jobs) + if changed { + stmt = stmt.CopyNode() + stmt.Jobs = sel.(*Select) + } + return stmt +} + // CopyNode makes a copy of this Statement without recursing in any child Statements. func (stmt *Import) CopyNode() *Import { stmtCopy := *stmt @@ -1195,6 +1211,7 @@ var _ WalkableStmt = &Update{} var _ WalkableStmt = &ValuesClause{} var _ WalkableStmt = &CancelQueries{} var _ WalkableStmt = &CancelSessions{} +var _ WalkableStmt = &ControlJobs{} // WalkStmt walks the entire parsed stmt calling WalkExpr on each // expression, and replacing each expression with the one returned diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index fe7be8807dff..5b92ca498641 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -507,10 +507,8 @@ func (v *planVisitor) visit(plan planNode) { case *cancelSessionsNode: v.visit(n.rows) - case *controlJobNode: - if v.observer.expr != nil { - v.expr(name, "jobID", -1, n.jobID) - } + case *controlJobsNode: + v.visit(n.rows) } } @@ -562,7 +560,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user", reflect.TypeOf(&cancelQueriesNode{}): "cancel queries", reflect.TypeOf(&cancelSessionsNode{}): "cancel sessions", - reflect.TypeOf(&controlJobNode{}): "control job", + reflect.TypeOf(&controlJobsNode{}): "control jobs", reflect.TypeOf(&createDatabaseNode{}): "create database", reflect.TypeOf(&createIndexNode{}): "create index", reflect.TypeOf(&createTableNode{}): "create table",