Skip to content

Commit

Permalink
sql: make PAUSE/RESUME/CANCEL JOB (->JOBS) support multiple jobs
Browse files Browse the repository at this point in the history
Prior to this patch, `CANCEL/PAUSE/RESUME JOB` would accept a single
expression as argument.

This patch extends the syntax to accept an arbitrary number of job
IDs, via a selection query. The syntax becomes `CANCEL/PAUSE/RESUME
JOBS <query...>`. The syntax `CANCEL/PAUSE/RESUME JOB` with a single
expression is preserved for backward compatibility, and is desugared
to `CANCEL/PAUSE/RESUME JOBS VALUES (...)` during parsing.

Commands are processed atomically. If a job's state cannot be updated,
an error occurs and all the commands are cancelled (i.e. every job
remains in its initial state).

In addition, the statement now returns the number of job
commands processed.

Finally, the amount of boilerplate in the AST nodes is reduced, by
using a single `ControlJobs` node type instead of separate `ResumeJob`
/ `CancelJob` / `PauseJob`.

Release note (sql change): The `CANCEL/PAUSE/RESUME JOB` statements
are extended with variants `CANCEL/PAUSE/RESUME JOBS` able to operate
on multiple jobs at once. For example, to pause all running jobs run
`PAUSE JOBS SELECT id FROM [SHOW JOBS] WHERE status = 'running'`.
  • Loading branch information
knz committed May 1, 2018
1 parent d56b53f commit 9f694b9
Show file tree
Hide file tree
Showing 18 changed files with 315 additions and 222 deletions.
7 changes: 5 additions & 2 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ backup_stmt ::=
'BACKUP' targets 'TO' string_or_placeholder opt_as_of_clause opt_incremental opt_with_options

cancel_stmt ::=
cancel_job_stmt
cancel_jobs_stmt
| cancel_queries_stmt
| cancel_sessions_stmt

Expand Down Expand Up @@ -102,6 +102,7 @@ import_stmt ::=

pause_stmt ::=
'PAUSE' 'JOB' a_expr
| 'PAUSE' 'JOBS' select_stmt

prepare_stmt ::=
'PREPARE' table_alias_name prep_type_clause 'AS' preparable_stmt
Expand All @@ -112,6 +113,7 @@ restore_stmt ::=

resume_stmt ::=
'RESUME' 'JOB' a_expr
| 'RESUME' 'JOBS' select_stmt

revoke_stmt ::=
'REVOKE' privileges 'ON' targets 'FROM' name_list
Expand Down Expand Up @@ -217,8 +219,9 @@ opt_with_options ::=
| 'WITH' 'OPTIONS' '(' kv_option_list ')'
|

cancel_job_stmt ::=
cancel_jobs_stmt ::=
'CANCEL' 'JOB' a_expr
| 'CANCEL' 'JOBS' select_stmt

cancel_queries_stmt ::=
'CANCEL' 'QUERY' a_expr
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/cancel_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
return ok, err
}

datum := n.rows.Values()[0]
if datum == tree.DNull {
return true, nil
}

statusServer := params.extendedEvalCtx.StatusServer
queryIDDatum := n.rows.Values()[0].(*tree.DString)
queryIDDatum := datum.(*tree.DString)

queryIDString := string(*queryIDDatum)
queryID, err := StringToClusterWideID(queryIDString)
Expand Down
117 changes: 0 additions & 117 deletions pkg/sql/control_job.go

This file was deleted.

112 changes: 112 additions & 0 deletions pkg/sql/control_jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package sql

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/jobs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/pkg/errors"
)

type controlJobsNode struct {
rows planNode
desiredStatus jobs.Status
numRows int
}

var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.Status{
tree.CancelJob: jobs.StatusCanceled,
tree.ResumeJob: jobs.StatusRunning,
tree.PauseJob: jobs.StatusPaused,
}

func (p *planner) ControlJobs(ctx context.Context, n *tree.ControlJobs) (planNode, error) {
rows, err := p.newPlan(ctx, n.Jobs, []types.T{types.Int})
if err != nil {
return nil, err
}
cols := planColumns(rows)
if len(cols) != 1 {
return nil, errors.Errorf("%s JOBS expects a single column source, got %d columns",
tree.JobCommandToStatement[n.Command], len(cols))
}
if !cols[0].Typ.Equivalent(types.Int) {
return nil, errors.Errorf("%s QUERIES requires int values, not type %s",
tree.JobCommandToStatement[n.Command], cols[0].Typ)
}

return &controlJobsNode{
rows: rows,
desiredStatus: jobCommandToDesiredStatus[n.Command],
}, nil
}

// FastPathResults implements the planNodeFastPath inteface.
func (n *controlJobsNode) FastPathResults() (int, bool) {
return n.numRows, true
}

// startExec implements the execStartable interface.
func (n *controlJobsNode) startExec(params runParams) error {
reg := params.p.ExecCfg().JobRegistry
for {
ok, err := n.rows.Next(params)
if err != nil {
return err
}
if !ok {
break
}

datum := n.rows.Values()[0]
if datum == tree.DNull {
continue
}

jobIDDatum := datum.(*tree.DInt)
jobID, ok := tree.AsDInt(jobIDDatum)
if !ok {
return fmt.Errorf("%s is not a valid job ID", jobIDDatum)
}

switch n.desiredStatus {
case jobs.StatusPaused:
err = reg.Pause(params.ctx, params.p.txn, int64(jobID))
case jobs.StatusRunning:
err = reg.Resume(params.ctx, params.p.txn, int64(jobID))
case jobs.StatusCanceled:
err = reg.Cancel(params.ctx, params.p.txn, int64(jobID))
default:
err = fmt.Errorf("programmer error: unhandled status %v", n.desiredStatus)
}
if err != nil {
return err
}
n.numRows++
}
return nil
}

func (*controlJobsNode) Next(runParams) (bool, error) { return false, nil }

func (*controlJobsNode) Values() tree.Datums { return nil }

func (n *controlJobsNode) Close(ctx context.Context) {
n.rows.Close(ctx)
}
8 changes: 6 additions & 2 deletions pkg/sql/expand_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,15 @@ func doExpandPlan(
case *cancelSessionsNode:
n.rows, err = doExpandPlan(ctx, p, noParams, n.rows)

case *controlJobsNode:
n.rows, err = doExpandPlan(ctx, p, noParams, n.rows)

case *valuesNode:
case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
case *alterUserSetPasswordNode:
case *scrubNode:
case *controlJobNode:
case *createDatabaseNode:
case *createIndexNode:
case *CreateUserNode:
Expand Down Expand Up @@ -822,13 +824,15 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column
case *cancelSessionsNode:
n.rows = p.simplifyOrderings(n.rows, nil)

case *controlJobsNode:
n.rows = p.simplifyOrderings(n.rows, nil)

case *valuesNode:
case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
case *alterUserSetPasswordNode:
case *scrubNode:
case *controlJobNode:
case *createDatabaseNode:
case *createIndexNode:
case *CreateUserNode:
Expand Down
33 changes: 30 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/run_control
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,54 @@ PAUSE JOB 1
query error could not parse "foo" as type int
PAUSE JOB 'foo'

query error NULL is not a valid job ID
query error could not parse "foo" as type int
CANCEL JOBS SELECT 'foo'

query error CANCEL JOBS expects a single column source, got 2 columns
CANCEL JOBS VALUES (1,2)

statement ok count 0
PAUSE JOB (SELECT id FROM system.jobs LIMIT 0)

statement ok count 0
PAUSE JOBS SELECT id FROM system.jobs LIMIT 0

query error could not parse "foo" as type int
PAUSE JOBS SELECT 'foo'

query error PAUSE JOBS expects a single column source, got 2 columns
PAUSE JOBS VALUES (1,2)

query error job with ID 1 does not exist
RESUME JOB 1

query error could not parse "foo" as type int
RESUME JOB 'foo'

query error NULL is not a valid job ID
query error could not parse "foo" as type int
RESUME JOBS SELECT 'foo'

query error RESUME JOBS expects a single column source, got 2 columns
RESUME JOBS VALUES (1,2)

statement ok count 0
RESUME JOB (SELECT id FROM system.jobs LIMIT 0)

statement ok count 0
RESUME JOBS SELECT id FROM system.jobs LIMIT 0

query error job with ID 1 does not exist
CANCEL JOB 1

query error could not parse "foo" as type int
CANCEL JOB 'foo'

query error NULL is not a valid job ID
statement ok count 0
CANCEL JOB (SELECT id FROM system.jobs LIMIT 0)

statement ok count 0
CANCEL JOBS SELECT id FROM system.jobs LIMIT 0

query error CANCEL QUERIES requires string values, not type int
CANCEL QUERY 1

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/opt_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,16 @@ func (p *planner) propagateFilters(
return plan, extraFilter, err
}

case *controlJobsNode:
if n.rows, err = p.triggerFilterPropagation(ctx, n.rows); err != nil {
return plan, extraFilter, err
}

case *alterIndexNode:
case *alterTableNode:
case *alterSequenceNode:
case *alterUserSetPasswordNode:
case *scrubNode:
case *controlJobNode:
case *createDatabaseNode:
case *createIndexNode:
case *CreateUserNode:
Expand Down
Loading

0 comments on commit 9f694b9

Please sign in to comment.