From 93586156dfab2cf29a1e50515dd16c9b7105b4d7 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 29 Apr 2018 16:18:33 +0200 Subject: [PATCH 1/3] sql: make CANCEL QUERY (-> QUERIES) support multiple queries Prior to this patch, `CANCEL QUERY` would accept a single expression as argument. This patch extends the syntax to accept an arbitrary number of query IDs, via a selection query. The syntax becomes `CANCEL QUERIES `. The syntax `CANCEL QUERY` with a single expression is preserved for backward compatibility, and is desugared to `CANCEL QUERIES VALUES (...)` during parsing. Queries are canceled sequentially in the order given by the selection. If a query cannot be canceled and IF EXISTS is not specified, an error occurs and no further query cancellations are attempted. In addition, the statement now returns the number of query cancellations processed (including non-existent queries). Release note (sql change): The `CANCEL QUERY` statement is extended with a variant `CANCEL QUERIES` able to cancel multiple queries at once. For example, to cancel all queries on node 1 run `CANCEL QUERIES SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE node_id = 1`. --- docs/generated/sql/bnf/cancel_query.bnf | 4 +- docs/generated/sql/bnf/stmt_block.bnf | 6 +- pkg/cmd/docgen/diagrams.go | 2 +- .../{cancel_query.go => cancel_queries.go} | 59 ++++++++++--------- pkg/sql/expand_plan.go | 8 ++- .../logictest/testdata/logic_test/run_control | 12 +++- pkg/sql/opt_filters.go | 6 +- pkg/sql/opt_limits.go | 4 +- pkg/sql/opt_needed.go | 4 +- pkg/sql/parser/help_test.go | 9 ++- pkg/sql/parser/parse_test.go | 15 +++-- pkg/sql/parser/sql.y | 41 +++++++++---- pkg/sql/plan.go | 8 +-- pkg/sql/sem/tree/run_control.go | 12 ++-- pkg/sql/sem/tree/stmt.go | 10 ++-- pkg/sql/sem/tree/walk.go | 17 ++++++ pkg/sql/walk.go | 8 +-- 17 files changed, 149 insertions(+), 76 deletions(-) rename pkg/sql/{cancel_query.go => cancel_queries.go} (52%) diff --git a/docs/generated/sql/bnf/cancel_query.bnf b/docs/generated/sql/bnf/cancel_query.bnf index d93dff5787c3..c80b563f75a9 100644 --- a/docs/generated/sql/bnf/cancel_query.bnf +++ b/docs/generated/sql/bnf/cancel_query.bnf @@ -1,3 +1,5 @@ -cancel_query_stmt ::= +cancel_queries_stmt ::= 'CANCEL' 'QUERY' query_id | 'CANCEL' 'QUERY' 'IF' 'EXISTS' query_id + | 'CANCEL' 'QUERIES' select_stmt + | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 96b7aca4d56e..7e6c8eda255d 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -48,7 +48,7 @@ backup_stmt ::= cancel_stmt ::= cancel_job_stmt - | cancel_query_stmt + | cancel_queries_stmt | cancel_session_stmt copy_from_stmt ::= @@ -220,9 +220,11 @@ opt_with_options ::= cancel_job_stmt ::= 'CANCEL' 'JOB' a_expr -cancel_query_stmt ::= +cancel_queries_stmt ::= 'CANCEL' 'QUERY' a_expr | 'CANCEL' 'QUERY' 'IF' 'EXISTS' a_expr + | 'CANCEL' 'QUERIES' select_stmt + | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt cancel_session_stmt ::= 'CANCEL' 'SESSION' a_expr diff --git a/pkg/cmd/docgen/diagrams.go b/pkg/cmd/docgen/diagrams.go index 018d7f8c0f8e..adbb6d3f1792 100644 --- a/pkg/cmd/docgen/diagrams.go +++ b/pkg/cmd/docgen/diagrams.go @@ -437,7 +437,7 @@ var specs = []stmtSpec{ inline: []string{"opt_transaction"}, match: []*regexp.Regexp{regexp.MustCompile("'COMMIT'|'END'")}, }, - {name: "cancel_query", stmt: "cancel_query_stmt", replace: map[string]string{"a_expr": "query_id"}, unlink: []string{"query_id"}}, + {name: "cancel_query", stmt: "cancel_queries_stmt", replace: map[string]string{"a_expr": "query_id"}, unlink: []string{"query_id"}}, {name: "create_database_stmt", inline: []string{"opt_encoding_clause"}, replace: map[string]string{"'SCONST'": "encoding"}, unlink: []string{"name", "encoding"}}, { name: "create_index_stmt", diff --git a/pkg/sql/cancel_query.go b/pkg/sql/cancel_queries.go similarity index 52% rename from pkg/sql/cancel_query.go rename to pkg/sql/cancel_queries.go index 6a3a21b0898b..044ad6b37963 100644 --- a/pkg/sql/cancel_query.go +++ b/pkg/sql/cancel_queries.go @@ -25,43 +25,46 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/types" ) -type cancelQueryNode struct { - queryID tree.TypedExpr +type cancelQueriesNode struct { + rows planNode ifExists bool } -func (p *planner) CancelQuery(ctx context.Context, n *tree.CancelQuery) (planNode, error) { - typedQueryID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.String, - true, /* requireType */ - "CANCEL QUERY", - ) +func (p *planner) CancelQueries(ctx context.Context, n *tree.CancelQueries) (planNode, error) { + rows, err := p.newPlan(ctx, n.Queries, []types.T{types.String}) if err != nil { return nil, err } + cols := planColumns(rows) + if len(cols) != 1 { + return nil, errors.Errorf("CANCEL QUERIES expects a single column source, got %d columns", len(cols)) + } + if !cols[0].Typ.Equivalent(types.String) { + return nil, errors.Errorf("CANCEL QUERIES requires string values, not type %s", cols[0].Typ) + } - return &cancelQueryNode{ - queryID: typedQueryID, + return &cancelQueriesNode{ + rows: rows, ifExists: n.IfExists, }, nil } -func (n *cancelQueryNode) startExec(params runParams) error { - statusServer := params.extendedEvalCtx.StatusServer +func (n *cancelQueriesNode) Next(params runParams) (bool, error) { + // TODO(knz): instead of performing the cancels sequentially, + // accumulate all the query IDs and then send batches to each of the + // nodes. - queryIDDatum, err := n.queryID.Eval(params.EvalContext()) - if err != nil { - return err + if ok, err := n.rows.Next(params); err != nil || !ok { + return ok, err } - queryIDString := tree.AsStringWithFlags(queryIDDatum, tree.FmtBareStrings) + statusServer := params.extendedEvalCtx.StatusServer + queryIDDatum := n.rows.Values()[0].(*tree.DString) + + queryIDString := string(*queryIDDatum) queryID, err := StringToClusterWideID(queryIDString) if err != nil { - return errors.Wrapf(err, "invalid query ID '%s'", queryIDString) + return false, errors.Wrapf(err, "invalid query ID '%s'", queryIDString) } // Get the lowest 32 bits of the query ID. @@ -75,16 +78,18 @@ func (n *cancelQueryNode) startExec(params runParams) error { response, err := statusServer.CancelQuery(params.ctx, request) if err != nil { - return err + return false, err } if !response.Canceled && !n.ifExists { - return fmt.Errorf("could not cancel query %s: %s", queryID, response.Error) + return false, fmt.Errorf("could not cancel query %s: %s", queryID, response.Error) } - return nil + return true, nil } -func (n *cancelQueryNode) Next(runParams) (bool, error) { return false, nil } -func (*cancelQueryNode) Values() tree.Datums { return nil } -func (*cancelQueryNode) Close(context.Context) {} +func (*cancelQueriesNode) Values() tree.Datums { return nil } + +func (n *cancelQueriesNode) Close(ctx context.Context) { + n.rows.Close(ctx) +} diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index 961a59940d7d..af5e3378d81e 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -333,12 +333,14 @@ func doExpandPlan( case *testingRelocateNode: n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *cancelQueriesNode: + n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelQueryNode: case *cancelSessionNode: case *scrubNode: case *controlJobNode: @@ -812,12 +814,14 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *testingRelocateNode: n.rows = p.simplifyOrderings(n.rows, nil) + case *cancelQueriesNode: + n.rows = p.simplifyOrderings(n.rows, nil) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelQueryNode: case *cancelSessionNode: case *scrubNode: case *controlJobNode: diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index 303d902b2ce2..fd90cfa6a81c 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -27,11 +27,21 @@ CANCEL JOB 'foo' query error NULL is not a valid job ID CANCEL JOB (SELECT id FROM system.jobs LIMIT 0) -query error argument of CANCEL QUERY must be type string, not type int +query error CANCEL QUERIES requires string values, not type int CANCEL QUERY 1 +query error CANCEL QUERIES expects a single column source, got 2 columns +CANCEL QUERIES VALUES (1,2) + query error odd length hex string CANCEL QUERY 'f54' query error not found CANCEL QUERY '14d2355b9cccbca50000000000000001' + +statement ok count 0 +CANCEL QUERY (SELECT 'a' LIMIT 0) + +statement ok count 0 +CANCEL QUERIES SELECT 'a' LIMIT 0 + diff --git a/pkg/sql/opt_filters.go b/pkg/sql/opt_filters.go index 767f3881f9fe..227aaabbe881 100644 --- a/pkg/sql/opt_filters.go +++ b/pkg/sql/opt_filters.go @@ -339,11 +339,15 @@ func (p *planner) propagateFilters( return plan, extraFilter, err } + case *cancelQueriesNode: + if n.rows, err = p.triggerFilterPropagation(ctx, n.rows); err != nil { + return plan, extraFilter, err + } + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelQueryNode: case *cancelSessionNode: case *scrubNode: case *controlJobNode: diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index 06cc69f21d5b..05d3c5607090 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -187,12 +187,14 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *testingRelocateNode: p.setUnlimited(n.rows) + case *cancelQueriesNode: + p.setUnlimited(n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelQueryNode: case *cancelSessionNode: case *scrubNode: case *controlJobNode: diff --git a/pkg/sql/opt_needed.go b/pkg/sql/opt_needed.go index f4899bbc1d46..1ae0d1b0ab9f 100644 --- a/pkg/sql/opt_needed.go +++ b/pkg/sql/opt_needed.go @@ -203,11 +203,13 @@ func setNeededColumns(plan planNode, needed []bool) { // The sub-node is a DELETE, INSERT, UPDATE etc. and will decide which columns it needs. setNeededColumns(n.source, nil) + case *cancelQueriesNode: + setNeededColumns(n.rows, allColumns(n.rows)) + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelQueryNode: case *cancelSessionNode: case *controlJobNode: case *scrubNode: diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index a9ebd1494241..f49674d662db 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -72,9 +72,12 @@ func TestContextualHelp(t *testing.T) { {`CANCEL ??`, `CANCEL`}, {`CANCEL JOB ??`, `CANCEL JOB`}, - {`CANCEL QUERY ??`, `CANCEL QUERY`}, - {`CANCEL QUERY IF ??`, `CANCEL QUERY`}, - {`CANCEL QUERY IF EXISTS ??`, `CANCEL QUERY`}, + {`CANCEL QUERY ??`, `CANCEL QUERIES`}, + {`CANCEL QUERY IF ??`, `CANCEL QUERIES`}, + {`CANCEL QUERY IF EXISTS ??`, `CANCEL QUERIES`}, + {`CANCEL QUERIES ??`, `CANCEL QUERIES`}, + {`CANCEL QUERIES IF ??`, `CANCEL QUERIES`}, + {`CANCEL QUERIES IF EXISTS ??`, `CANCEL QUERIES`}, {`CANCEL SESSION ??`, `CANCEL SESSION`}, {`CANCEL SESSION IF ??`, `CANCEL SESSION`}, {`CANCEL SESSION IF EXISTS ??`, `CANCEL SESSION`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 55d85f68b1d4..1f93877f82c5 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -311,9 +311,9 @@ func TestParse(t *testing.T) { {`DROP SEQUENCE a, b CASCADE`}, {`CANCEL JOB a`}, - {`CANCEL QUERY a`}, + {`CANCEL QUERIES SELECT a`}, {`CANCEL SESSION a`}, - {`CANCEL QUERY IF EXISTS a`}, + {`CANCEL QUERIES IF EXISTS SELECT a`}, {`CANCEL SESSION IF EXISTS a`}, {`RESUME JOB a`}, {`PAUSE JOB a`}, @@ -415,10 +415,10 @@ func TestParse(t *testing.T) { {`PREPARE a (STRING) AS BACKUP DATABASE a TO $1`}, {`PREPARE a AS RESTORE DATABASE a FROM 'b'`}, {`PREPARE a (STRING) AS RESTORE DATABASE a FROM $1`}, - {`PREPARE a AS CANCEL QUERY 1`}, - {`PREPARE a (STRING) AS CANCEL QUERY $1`}, - {`PREPARE a AS CANCEL QUERY IF EXISTS 1`}, - {`PREPARE a (STRING) AS CANCEL QUERY IF EXISTS $1`}, + {`PREPARE a AS CANCEL QUERIES SELECT 1`}, + {`PREPARE a (STRING) AS CANCEL QUERIES SELECT $1`}, + {`PREPARE a AS CANCEL QUERIES IF EXISTS SELECT 1`}, + {`PREPARE a (STRING) AS CANCEL QUERIES IF EXISTS SELECT $1`}, {`PREPARE a AS CANCEL SESSION 1`}, {`PREPARE a (STRING) AS CANCEL SESSION $1`}, {`PREPARE a AS CANCEL SESSION IF EXISTS 1`}, @@ -1291,6 +1291,9 @@ func TestParse2(t *testing.T) { {`DEALLOCATE PREPARE ALL`, `DEALLOCATE ALL`}, + {`CANCEL QUERY a`, `CANCEL QUERIES VALUES (a)`}, + {`CANCEL QUERY IF EXISTS a`, `CANCEL QUERIES IF EXISTS VALUES (a)`}, + {`BACKUP DATABASE foo TO bar`, `BACKUP DATABASE foo TO 'bar'`}, {`BACKUP DATABASE foo TO "bar.12" INCREMENTAL FROM "baz.34"`, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 4b80916a1d5f..3c152714deb3 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -606,7 +606,7 @@ func newNameFromStr(s string) *tree.Name { %type cancel_stmt %type cancel_job_stmt -%type cancel_query_stmt +%type cancel_queries_stmt %type cancel_session_stmt // SCRUB @@ -1705,10 +1705,10 @@ copy_from_stmt: // %Help: CANCEL // %Category: Group -// %Text: CANCEL JOB, CANCEL QUERY, CANCEL SESSION +// %Text: CANCEL JOB, CANCEL QUERIES, CANCEL SESSION cancel_stmt: cancel_job_stmt // EXTEND WITH HELP: CANCEL JOB -| cancel_query_stmt // EXTEND WITH HELP: CANCEL QUERY +| cancel_queries_stmt // EXTEND WITH HELP: CANCEL QUERIES | cancel_session_stmt // EXTEND WITH HELP: CANCEL SESSION | CANCEL error // SHOW HELP: CANCEL @@ -1723,20 +1723,41 @@ cancel_job_stmt: } | CANCEL JOB error // SHOW HELP: CANCEL JOB -// %Help: CANCEL QUERY - cancel a running query +// %Help: CANCEL QUERIES - cancel running queries // %Category: Misc -// %Text: CANCEL QUERY [IF EXISTS] +// %Text: +// CANCEL QUERIES [IF EXISTS] +// CANCEL QUERY [IF EXISTS] // %SeeAlso: SHOW QUERIES -cancel_query_stmt: +cancel_queries_stmt: CANCEL QUERY a_expr { - $$.val = &tree.CancelQuery{ID: $3.expr(), IfExists: false} + $$.val = &tree.CancelQueries{ + Queries: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + IfExists: false, + } } | CANCEL QUERY IF EXISTS a_expr { - $$.val = &tree.CancelQuery{ID: $5.expr(), IfExists: true} + $$.val = &tree.CancelQueries{ + Queries: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$5.expr()}}}}, + }, + IfExists: true, + } + } +| CANCEL QUERY error // SHOW HELP: CANCEL QUERIES +| CANCEL QUERIES select_stmt + { + $$.val = &tree.CancelQueries{Queries: $3.slct(), IfExists: false} + } +| CANCEL QUERIES IF EXISTS select_stmt + { + $$.val = &tree.CancelQueries{Queries: $5.slct(), IfExists: true} } -| CANCEL QUERY error // SHOW HELP: CANCEL QUERY +| CANCEL QUERIES error // SHOW HELP: CANCEL QUERIES // %Help: CANCEL SESSION - cancel an open session // %Category: Misc @@ -2848,7 +2869,7 @@ show_constraints_stmt: // %Help: SHOW QUERIES - list running queries // %Category: Misc // %Text: SHOW [CLUSTER | LOCAL] QUERIES -// %SeeAlso: CANCEL QUERY +// %SeeAlso: CANCEL QUERIES show_queries_stmt: SHOW QUERIES { diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index e9d897712b72..b3cfd3ffaaca 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -637,8 +637,8 @@ func (p *planner) newPlan( return p.AlterSequence(ctx, n) case *tree.AlterUserSetPassword: return p.AlterUserSetPassword(ctx, n) - case *tree.CancelQuery: - return p.CancelQuery(ctx, n) + case *tree.CancelQueries: + return p.CancelQueries(ctx, n) case *tree.CancelSession: return p.CancelSession(ctx, n) case *tree.CancelJob: @@ -820,8 +820,8 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, switch n := stmt.(type) { case *tree.AlterUserSetPassword: return p.AlterUserSetPassword(ctx, n) - case *tree.CancelQuery: - return p.CancelQuery(ctx, n) + case *tree.CancelQueries: + return p.CancelQueries(ctx, n) case *tree.CancelSession: return p.CancelSession(ctx, n) case *tree.CancelJob: diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index c9d5460fc2dc..e664623697c5 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -47,19 +47,19 @@ func (node *CancelJob) Format(ctx *FmtCtx) { ctx.FormatNode(node.ID) } -// CancelQuery represents a CANCEL QUERY statement. -type CancelQuery struct { - ID Expr +// CancelQueries represents a CANCEL QUERIES statement. +type CancelQueries struct { + Queries *Select IfExists bool } // Format implements the NodeFormatter interface. -func (node *CancelQuery) Format(ctx *FmtCtx) { - ctx.WriteString("CANCEL QUERY ") +func (node *CancelQueries) Format(ctx *FmtCtx) { + ctx.WriteString("CANCEL QUERIES ") if node.IfExists { ctx.WriteString("IF EXISTS ") } - ctx.FormatNode(node.ID) + ctx.FormatNode(node.Queries) } // CancelSession represents a CANCEL SESSION statement. diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 00be78381ad6..6d7d8c206708 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -125,7 +125,7 @@ type HiddenFromStats interface { // HiddenFromShowQueries is a pseudo-interface to be implemented // by statements that should not show up in SHOW QUERIES (and are hence -// not cancellable using CANCEL QUERY either). Usually implemented by +// not cancellable using CANCEL QUERIES either). Usually implemented by // statements that spawn jobs. type HiddenFromShowQueries interface { hiddenFromShowQueries() @@ -213,12 +213,12 @@ func (*CancelJob) StatementType() StatementType { return Ack } func (*CancelJob) StatementTag() string { return "CANCEL JOB" } // StatementType implements the Statement interface. -func (*CancelQuery) StatementType() StatementType { return Ack } +func (*CancelQueries) StatementType() StatementType { return RowsAffected } // StatementTag returns a short string identifying the type of statement. -func (*CancelQuery) StatementTag() string { return "CANCEL QUERY" } +func (*CancelQueries) StatementTag() string { return "CANCEL QUERIES" } -func (*CancelQuery) independentFromParallelizedPriors() {} +func (*CancelQueries) independentFromParallelizedPriors() {} // StatementType implements the Statement interface. func (*CancelSession) StatementType() StatementType { return Ack } @@ -889,7 +889,7 @@ func (n *AlterSequence) String() string { return AsString(n) } func (n *Backup) String() string { return AsString(n) } func (n *BeginTransaction) String() string { return AsString(n) } func (n *CancelJob) String() string { return AsString(n) } -func (n *CancelQuery) String() string { return AsString(n) } +func (n *CancelQueries) String() string { return AsString(n) } func (n *CancelSession) String() string { return AsString(n) } func (n *CommitTransaction) String() string { return AsString(n) } func (n *CopyFrom) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index bc732a641b1d..632d013ffc5a 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -783,6 +783,22 @@ func (stmt *CreateTable) WalkStmt(v Visitor) Statement { return ret } +// CopyNode makes a copy of this Statement without recursing in any child Statements. +func (stmt *CancelQueries) CopyNode() *CancelQueries { + stmtCopy := *stmt + return &stmtCopy +} + +// WalkStmt is part of the WalkableStmt interface. +func (stmt *CancelQueries) WalkStmt(v Visitor) Statement { + sel, changed := WalkStmt(v, stmt.Queries) + if changed { + stmt = stmt.CopyNode() + stmt.Queries = sel.(*Select) + } + return stmt +} + // CopyNode makes a copy of this Statement without recursing in any child Statements. func (stmt *Import) CopyNode() *Import { stmtCopy := *stmt @@ -1161,6 +1177,7 @@ var _ WalkableStmt = &SetClusterSetting{} var _ WalkableStmt = &SetVar{} var _ WalkableStmt = &Update{} var _ WalkableStmt = &ValuesClause{} +var _ WalkableStmt = &CancelQueries{} // WalkStmt walks the entire parsed stmt calling WalkExpr on each // expression, and replacing each expression with the one returned diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index daab8a3d41ff..533d9fed5420 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -501,10 +501,8 @@ func (v *planVisitor) visit(plan planNode) { } v.visit(n.plan) - case *cancelQueryNode: - if v.observer.expr != nil { - v.expr(name, "queryID", -1, n.queryID) - } + case *cancelQueriesNode: + v.visit(n.rows) case *cancelSessionNode: if v.observer.expr != nil { @@ -564,7 +562,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&alterTableNode{}): "alter table", reflect.TypeOf(&alterSequenceNode{}): "alter sequence", reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user", - reflect.TypeOf(&cancelQueryNode{}): "cancel query", + reflect.TypeOf(&cancelQueriesNode{}): "cancel queries", reflect.TypeOf(&cancelSessionNode{}): "cancel session", reflect.TypeOf(&controlJobNode{}): "control job", reflect.TypeOf(&createDatabaseNode{}): "create database", From d56b53fc23503d574b83e4d15bbf71d45839f8a2 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 29 Apr 2018 17:32:33 +0200 Subject: [PATCH 2/3] sql: make CANCEL SESSION (-> SESSIONS) support multiple sessions Prior to this patch, `CANCEL SESSION` would accept a single expression as argument. This patch extends the syntax to accept an arbitrary number of query IDs, via a selection query. The syntax becomes `CANCEL SESSIONS `. The syntax `CANCEL SESSION` with a single expression is preserved for backward compatibility, and is desugared to `CANCEL SESSIONS VALUES (...)` during parsing. Sessions are canceled sequentially in the order given by the selection. If a session cannot be canceled and IF EXISTS is not specified, an error occurs and no further session cancellation is attempted. In addition, the statement now returns the number of session cancellations processed (including non-existent sessions, if IF EXISTS is specified). Release note (sql change): The `CANCEL SESSION` statement is extended with a variant `CANCEL SESSIONS` able to cancel multiple sessions at once. For example, to cancel all sessions on node 1 run `CANCEL SESSIONS SELECT session_id FROM [SHOW CLUSTER SESSIONS] WHERE node_id = 1`. --- docs/generated/sql/bnf/cancel_session.bnf | 5 ++ docs/generated/sql/bnf/stmt_block.bnf | 6 +- pkg/cmd/docgen/diagrams.go | 1 + .../{cancel_session.go => cancel_sessions.go} | 68 +++++++++++-------- pkg/sql/expand_plan.go | 8 ++- .../logictest/testdata/logic_test/run_control | 17 ++++- pkg/sql/opt_filters.go | 6 +- pkg/sql/opt_limits.go | 4 +- pkg/sql/opt_needed.go | 4 +- pkg/sql/parser/help_test.go | 9 ++- pkg/sql/parser/parse_test.go | 14 ++-- pkg/sql/parser/sql.y | 47 +++++++++---- pkg/sql/pgwire/pgwire_test.go | 18 +++++ pkg/sql/plan.go | 8 +-- pkg/sql/run_control_test.go | 48 ++++++++++++- pkg/sql/sem/tree/run_control.go | 12 ++-- pkg/sql/sem/tree/stmt.go | 8 +-- pkg/sql/sem/tree/walk.go | 17 +++++ pkg/sql/walk.go | 8 +-- 19 files changed, 225 insertions(+), 83 deletions(-) create mode 100644 docs/generated/sql/bnf/cancel_session.bnf rename pkg/sql/{cancel_session.go => cancel_sessions.go} (50%) diff --git a/docs/generated/sql/bnf/cancel_session.bnf b/docs/generated/sql/bnf/cancel_session.bnf new file mode 100644 index 000000000000..c184dc24bb09 --- /dev/null +++ b/docs/generated/sql/bnf/cancel_session.bnf @@ -0,0 +1,5 @@ +cancel_sessions_stmt ::= + 'CANCEL' 'SESSION' session_id + | 'CANCEL' 'SESSION' 'IF' 'EXISTS' session_id + | 'CANCEL' 'SESSIONS' select_stmt + | 'CANCEL' 'SESSIONS' 'IF' 'EXISTS' select_stmt diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 7e6c8eda255d..cf781525a97d 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -49,7 +49,7 @@ backup_stmt ::= cancel_stmt ::= cancel_job_stmt | cancel_queries_stmt - | cancel_session_stmt + | cancel_sessions_stmt copy_from_stmt ::= 'COPY' table_name opt_column_list 'FROM' 'STDIN' @@ -226,9 +226,11 @@ cancel_queries_stmt ::= | 'CANCEL' 'QUERIES' select_stmt | 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt -cancel_session_stmt ::= +cancel_sessions_stmt ::= 'CANCEL' 'SESSION' a_expr | 'CANCEL' 'SESSION' 'IF' 'EXISTS' a_expr + | 'CANCEL' 'SESSIONS' select_stmt + | 'CANCEL' 'SESSIONS' 'IF' 'EXISTS' select_stmt table_name ::= db_object_name diff --git a/pkg/cmd/docgen/diagrams.go b/pkg/cmd/docgen/diagrams.go index adbb6d3f1792..adc1ff943eb7 100644 --- a/pkg/cmd/docgen/diagrams.go +++ b/pkg/cmd/docgen/diagrams.go @@ -438,6 +438,7 @@ var specs = []stmtSpec{ match: []*regexp.Regexp{regexp.MustCompile("'COMMIT'|'END'")}, }, {name: "cancel_query", stmt: "cancel_queries_stmt", replace: map[string]string{"a_expr": "query_id"}, unlink: []string{"query_id"}}, + {name: "cancel_session", stmt: "cancel_sessions_stmt", replace: map[string]string{"a_expr": "session_id"}, unlink: []string{"session_id"}}, {name: "create_database_stmt", inline: []string{"opt_encoding_clause"}, replace: map[string]string{"'SCONST'": "encoding"}, unlink: []string{"name", "encoding"}}, { name: "create_index_stmt", diff --git a/pkg/sql/cancel_session.go b/pkg/sql/cancel_sessions.go similarity index 50% rename from pkg/sql/cancel_session.go rename to pkg/sql/cancel_sessions.go index 55d738bbc4b1..3f8cc25d7664 100644 --- a/pkg/sql/cancel_session.go +++ b/pkg/sql/cancel_sessions.go @@ -24,43 +24,51 @@ import ( "github.com/pkg/errors" ) -type cancelSessionNode struct { - sessionID tree.TypedExpr - ifExists bool +type cancelSessionsNode struct { + rows planNode + ifExists bool } -func (p *planner) CancelSession(ctx context.Context, n *tree.CancelSession) (planNode, error) { - typedSessionID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.String, - true, /* requireType */ - "CANCEL SESSION", - ) +func (p *planner) CancelSessions(ctx context.Context, n *tree.CancelSessions) (planNode, error) { + rows, err := p.newPlan(ctx, n.Sessions, []types.T{types.String}) if err != nil { return nil, err } + cols := planColumns(rows) + if len(cols) != 1 { + return nil, errors.Errorf("CANCEL SESSIONS expects a single column source, got %d columns", len(cols)) + } + if !cols[0].Typ.Equivalent(types.String) { + return nil, errors.Errorf("CANCEL SESSIONS requires string values, not type %s", cols[0].Typ) + } - return &cancelSessionNode{ - sessionID: typedSessionID, - ifExists: n.IfExists, + return &cancelSessionsNode{ + rows: rows, + ifExists: n.IfExists, }, nil } -func (n *cancelSessionNode) startExec(params runParams) error { - statusServer := params.extendedEvalCtx.StatusServer +func (n *cancelSessionsNode) Next(params runParams) (bool, error) { + // TODO(knz): instead of performing the cancels sequentially, + // accumulate all the query IDs and then send batches to each of the + // nodes. - sessionIDDatum, err := n.sessionID.Eval(params.EvalContext()) - if err != nil { - return err + if ok, err := n.rows.Next(params); err != nil || !ok { + return ok, err + } + + datum := n.rows.Values()[0] + if datum == tree.DNull { + return true, nil } - sessionIDString := tree.AsStringWithFlags(sessionIDDatum, tree.FmtBareStrings) + statusServer := params.extendedEvalCtx.StatusServer + sessionIDDatum := datum.(*tree.DString) + + sessionIDString := string(*sessionIDDatum) sessionID, err := StringToClusterWideID(sessionIDString) if err != nil { - return errors.Wrapf(err, "invalid session ID '%s'", sessionIDString) + return false, errors.Wrapf(err, "invalid session ID '%s'", sessionIDString) } // Get the lowest 32 bits of the session ID. @@ -74,16 +82,18 @@ func (n *cancelSessionNode) startExec(params runParams) error { response, err := statusServer.CancelSession(params.ctx, request) if err != nil { - return err + return false, err } if !response.Canceled && !n.ifExists { - return fmt.Errorf("could not cancel session %s: %s", sessionID, response.Error) + return false, fmt.Errorf("could not cancel session %s: %s", sessionID, response.Error) } - return nil + return true, nil } -func (n *cancelSessionNode) Next(runParams) (bool, error) { return false, nil } -func (*cancelSessionNode) Values() tree.Datums { return nil } -func (*cancelSessionNode) Close(context.Context) {} +func (*cancelSessionsNode) Values() tree.Datums { return nil } + +func (n *cancelSessionsNode) Close(ctx context.Context) { + n.rows.Close(ctx) +} diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index af5e3378d81e..f03c6fe55f77 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -336,12 +336,14 @@ func doExpandPlan( case *cancelQueriesNode: n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *cancelSessionsNode: + n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelSessionNode: case *scrubNode: case *controlJobNode: case *createDatabaseNode: @@ -817,12 +819,14 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *cancelQueriesNode: n.rows = p.simplifyOrderings(n.rows, nil) + case *cancelSessionsNode: + n.rows = p.simplifyOrderings(n.rows, nil) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelSessionNode: case *scrubNode: case *controlJobNode: case *createDatabaseNode: diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index fd90cfa6a81c..e365139e1738 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -39,9 +39,20 @@ CANCEL QUERY 'f54' query error not found CANCEL QUERY '14d2355b9cccbca50000000000000001' -statement ok count 0 -CANCEL QUERY (SELECT 'a' LIMIT 0) +query error CANCEL SESSIONS requires string values, not type int +CANCEL SESSION 1 + +query error CANCEL SESSIONS expects a single column source, got 2 columns +CANCEL SESSIONS VALUES (1,2) + +query error odd length hex string +CANCEL SESSION 'f54' + +query error not found +CANCEL SESSION '14d2355b9cccbca50000000000000001' statement ok count 0 -CANCEL QUERIES SELECT 'a' LIMIT 0 +CANCEL SESSION (SELECT 'a' LIMIT 0) +statement ok count 0 +CANCEL SESSIONS SELECT 'a' LIMIT 0 diff --git a/pkg/sql/opt_filters.go b/pkg/sql/opt_filters.go index 227aaabbe881..50602b726fca 100644 --- a/pkg/sql/opt_filters.go +++ b/pkg/sql/opt_filters.go @@ -344,11 +344,15 @@ func (p *planner) propagateFilters( return plan, extraFilter, err } + case *cancelSessionsNode: + if n.rows, err = p.triggerFilterPropagation(ctx, n.rows); err != nil { + return plan, extraFilter, err + } + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelSessionNode: case *scrubNode: case *controlJobNode: case *createDatabaseNode: diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index 05d3c5607090..14d82ba6b2f3 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -190,12 +190,14 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *cancelQueriesNode: p.setUnlimited(n.rows) + case *cancelSessionsNode: + p.setUnlimited(n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelSessionNode: case *scrubNode: case *controlJobNode: case *createDatabaseNode: diff --git a/pkg/sql/opt_needed.go b/pkg/sql/opt_needed.go index 1ae0d1b0ab9f..bf7f4eae341b 100644 --- a/pkg/sql/opt_needed.go +++ b/pkg/sql/opt_needed.go @@ -206,11 +206,13 @@ func setNeededColumns(plan planNode, needed []bool) { case *cancelQueriesNode: setNeededColumns(n.rows, allColumns(n.rows)) + case *cancelSessionsNode: + setNeededColumns(n.rows, allColumns(n.rows)) + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *cancelSessionNode: case *controlJobNode: case *scrubNode: case *createDatabaseNode: diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index f49674d662db..9af2e319e218 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -78,9 +78,12 @@ func TestContextualHelp(t *testing.T) { {`CANCEL QUERIES ??`, `CANCEL QUERIES`}, {`CANCEL QUERIES IF ??`, `CANCEL QUERIES`}, {`CANCEL QUERIES IF EXISTS ??`, `CANCEL QUERIES`}, - {`CANCEL SESSION ??`, `CANCEL SESSION`}, - {`CANCEL SESSION IF ??`, `CANCEL SESSION`}, - {`CANCEL SESSION IF EXISTS ??`, `CANCEL SESSION`}, + {`CANCEL SESSION ??`, `CANCEL SESSIONS`}, + {`CANCEL SESSION IF ??`, `CANCEL SESSIONS`}, + {`CANCEL SESSION IF EXISTS ??`, `CANCEL SESSIONS`}, + {`CANCEL SESSIONS ??`, `CANCEL SESSIONS`}, + {`CANCEL SESSIONS IF ??`, `CANCEL SESSIONS`}, + {`CANCEL SESSIONS IF EXISTS ??`, `CANCEL SESSIONS`}, {`CREATE UNIQUE ??`, `CREATE`}, {`CREATE UNIQUE INDEX ??`, `CREATE INDEX`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 1f93877f82c5..6f26d0683488 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -312,9 +312,9 @@ func TestParse(t *testing.T) { {`CANCEL JOB a`}, {`CANCEL QUERIES SELECT a`}, - {`CANCEL SESSION a`}, + {`CANCEL SESSIONS SELECT a`}, {`CANCEL QUERIES IF EXISTS SELECT a`}, - {`CANCEL SESSION IF EXISTS a`}, + {`CANCEL SESSIONS IF EXISTS SELECT a`}, {`RESUME JOB a`}, {`PAUSE JOB a`}, @@ -419,10 +419,10 @@ func TestParse(t *testing.T) { {`PREPARE a (STRING) AS CANCEL QUERIES SELECT $1`}, {`PREPARE a AS CANCEL QUERIES IF EXISTS SELECT 1`}, {`PREPARE a (STRING) AS CANCEL QUERIES IF EXISTS SELECT $1`}, - {`PREPARE a AS CANCEL SESSION 1`}, - {`PREPARE a (STRING) AS CANCEL SESSION $1`}, - {`PREPARE a AS CANCEL SESSION IF EXISTS 1`}, - {`PREPARE a (STRING) AS CANCEL SESSION IF EXISTS $1`}, + {`PREPARE a AS CANCEL SESSIONS SELECT 1`}, + {`PREPARE a (STRING) AS CANCEL SESSIONS SELECT $1`}, + {`PREPARE a AS CANCEL SESSIONS IF EXISTS SELECT 1`}, + {`PREPARE a (STRING) AS CANCEL SESSIONS IF EXISTS SELECT $1`}, {`PREPARE a AS CANCEL JOB 1`}, {`PREPARE a (INT) AS CANCEL JOB $1`}, {`PREPARE a AS PAUSE JOB 1`}, @@ -1293,6 +1293,8 @@ func TestParse2(t *testing.T) { {`CANCEL QUERY a`, `CANCEL QUERIES VALUES (a)`}, {`CANCEL QUERY IF EXISTS a`, `CANCEL QUERIES IF EXISTS VALUES (a)`}, + {`CANCEL SESSION a`, `CANCEL SESSIONS VALUES (a)`}, + {`CANCEL SESSION IF EXISTS a`, `CANCEL SESSIONS IF EXISTS VALUES (a)`}, {`BACKUP DATABASE foo TO bar`, `BACKUP DATABASE foo TO 'bar'`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 3c152714deb3..58d440b4c78e 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -607,7 +607,7 @@ func newNameFromStr(s string) *tree.Name { %type cancel_stmt %type cancel_job_stmt %type cancel_queries_stmt -%type cancel_session_stmt +%type cancel_sessions_stmt // SCRUB %type scrub_stmt @@ -1705,12 +1705,12 @@ copy_from_stmt: // %Help: CANCEL // %Category: Group -// %Text: CANCEL JOB, CANCEL QUERIES, CANCEL SESSION +// %Text: CANCEL JOB, CANCEL QUERIES, CANCEL SESSIONS cancel_stmt: - cancel_job_stmt // EXTEND WITH HELP: CANCEL JOB -| cancel_queries_stmt // EXTEND WITH HELP: CANCEL QUERIES -| cancel_session_stmt // EXTEND WITH HELP: CANCEL SESSION -| CANCEL error // SHOW HELP: CANCEL + cancel_job_stmt // EXTEND WITH HELP: CANCEL JOB +| cancel_queries_stmt // EXTEND WITH HELP: CANCEL QUERIES +| cancel_sessions_stmt // EXTEND WITH HELP: CANCEL SESSIONS +| CANCEL error // SHOW HELP: CANCEL // %Help: CANCEL JOB - cancel a background job // %Category: Misc @@ -1759,20 +1759,41 @@ cancel_queries_stmt: } | CANCEL QUERIES error // SHOW HELP: CANCEL QUERIES -// %Help: CANCEL SESSION - cancel an open session +// %Help: CANCEL SESSIONS - cancel open sessions // %Category: Misc -// %Text: CANCEL SESSION [IF EXISTS] +// %Text: +// CANCEL SESSIONS [IF EXISTS] +// CANCEL SESSION [IF EXISTS] // %SeeAlso: SHOW SESSIONS -cancel_session_stmt: +cancel_sessions_stmt: CANCEL SESSION a_expr { - $$.val = &tree.CancelSession{ID: $3.expr(), IfExists: false} + $$.val = &tree.CancelSessions{ + Sessions: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + IfExists: false, + } } | CANCEL SESSION IF EXISTS a_expr { - $$.val = &tree.CancelSession{ID: $5.expr(), IfExists: true} + $$.val = &tree.CancelSessions{ + Sessions: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$5.expr()}}}}, + }, + IfExists: true, + } + } +| CANCEL SESSION error // SHOW HELP: CANCEL SESSIONS +| CANCEL SESSIONS select_stmt + { + $$.val = &tree.CancelSessions{Sessions: $3.slct(), IfExists: false} + } +| CANCEL SESSIONS IF EXISTS select_stmt + { + $$.val = &tree.CancelSessions{Sessions: $5.slct(), IfExists: true} } -| CANCEL SESSION error // SHOW HELP: CANCEL SESSION +| CANCEL SESSIONS error // SHOW HELP: CANCEL SESSIONS comment_stmt: COMMENT ON TABLE table_name IS comment_text @@ -2934,7 +2955,7 @@ opt_compact: // %Help: SHOW SESSIONS - list open client sessions // %Category: Misc // %Text: SHOW [CLUSTER | LOCAL] SESSIONS -// %SeeAlso: CANCEL SESSION +// %SeeAlso: CANCEL SESSIONS show_sessions_stmt: SHOW SESSIONS { diff --git a/pkg/sql/pgwire/pgwire_test.go b/pkg/sql/pgwire/pgwire_test.go index 5b54ee3c32e3..c06ab80c63d4 100644 --- a/pkg/sql/pgwire/pgwire_test.go +++ b/pkg/sql/pgwire/pgwire_test.go @@ -1254,6 +1254,24 @@ func TestPGPreparedExec(t *testing.T) { baseTest.SetArgs("01").Error("pq: could not cancel query 00000000000000000000000000000001: query ID 00000000000000000000000000000001 not found"), }, }, + { + "CANCEL QUERIES SELECT $1", + []preparedExecTest{ + baseTest.SetArgs("01").Error("pq: could not cancel query 00000000000000000000000000000001: query ID 00000000000000000000000000000001 not found"), + }, + }, + { + "CANCEL SESSION $1", + []preparedExecTest{ + baseTest.SetArgs("01").Error("pq: could not cancel session 00000000000000000000000000000001: session ID 00000000000000000000000000000001 not found"), + }, + }, + { + "CANCEL SESSIONS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs("01").Error("pq: could not cancel session 00000000000000000000000000000001: session ID 00000000000000000000000000000001 not found"), + }, + }, // An empty string is valid in postgres. { "", diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index b3cfd3ffaaca..442b8da44055 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -639,8 +639,8 @@ func (p *planner) newPlan( return p.AlterUserSetPassword(ctx, n) case *tree.CancelQueries: return p.CancelQueries(ctx, n) - case *tree.CancelSession: - return p.CancelSession(ctx, n) + case *tree.CancelSessions: + return p.CancelSessions(ctx, n) case *tree.CancelJob: return p.CancelJob(ctx, n) case *tree.Scrub: @@ -822,8 +822,8 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.AlterUserSetPassword(ctx, n) case *tree.CancelQueries: return p.CancelQueries(ctx, n) - case *tree.CancelSession: - return p.CancelSession(ctx, n) + case *tree.CancelSessions: + return p.CancelSessions(ctx, n) case *tree.CancelJob: return p.CancelJob(ctx, n) case *tree.CreateUser: diff --git a/pkg/sql/run_control_test.go b/pkg/sql/run_control_test.go index bdb220ecb03b..7a61b6bf9090 100644 --- a/pkg/sql/run_control_test.go +++ b/pkg/sql/run_control_test.go @@ -75,7 +75,7 @@ func TestCancelSelectQuery(t *testing.T) { <-sem time.Sleep(time.Second * 2) - const cancelQuery = "CANCEL QUERY (SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE node_id = 2)" + const cancelQuery = "CANCEL QUERIES SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE node_id = 2" if _, err := conn1.Exec(cancelQuery); err != nil { t.Fatal(err) @@ -146,7 +146,7 @@ func TestCancelParallelQuery(t *testing.T) { // Cancel this query, even though it has already completed execution. // The other query (queryToBlock) should return a cancellation error. - const cancelQuery = "CANCEL QUERY (SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE node_id = 1 AND query LIKE '%INSERT INTO nums2 VALUES (2%')" + const cancelQuery = "CANCEL QUERIES SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE node_id = 1 AND query LIKE '%INSERT INTO nums2 VALUES (2%'" if _, err := conn2.Exec(cancelQuery); err != nil { errChan2 <- err } @@ -192,7 +192,7 @@ func TestCancelParallelQuery(t *testing.T) { func TestCancelDistSQLQuery(t *testing.T) { defer leaktest.AfterTest(t)() const queryToCancel = "SELECT * FROM nums ORDER BY num" - cancelQuery := fmt.Sprintf("CANCEL QUERY (SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE query = '%s')", queryToCancel) + cancelQuery := fmt.Sprintf("CANCEL QUERIES SELECT query_id FROM [SHOW CLUSTER QUERIES] WHERE query = '%s'", queryToCancel) // conn1 is used for the query above. conn2 is solely for the CANCEL statement. var conn1 *gosql.DB @@ -368,6 +368,48 @@ func testCancelSession(t *testing.T, hasActiveSession bool) { } } +func TestCancelMultipleSessions(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.TODO() + + numNodes := 2 + tc := serverutils.StartTestCluster(t, numNodes, + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + // Open two connections on node 1. + var conns [2]*gosql.Conn + for i := 0; i < 2; i++ { + var err error + conns[i], err = tc.ServerConn(0).Conn(ctx) + if err != nil { + t.Fatal(err) + } + } + // Open a control connection on node 2. + ctlconn, err := tc.ServerConn(1).Conn(ctx) + if err != nil { + t.Fatal(err) + } + + // Cancel the sessions on node 1. + if _, err = ctlconn.ExecContext(ctx, + `CANCEL SESSIONS SELECT session_id FROM [SHOW CLUSTER SESSIONS] WHERE node_id = 1`, + ); err != nil { + t.Fatal(err) + } + + // Verify that the connections on node 1 are closed. + for i := 0; i < 2; i++ { + _, err := conns[i].ExecContext(ctx, "SELECT 1") + if err != gosqldriver.ErrBadConn { + t.Fatalf("session %d not canceled; actual error: %s", i, err) + } + } +} + func TestIdleCancelSession(t *testing.T) { defer leaktest.AfterTest(t)() testCancelSession(t, false /* hasActiveSession */) diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index e664623697c5..0893d4c00cac 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -62,17 +62,17 @@ func (node *CancelQueries) Format(ctx *FmtCtx) { ctx.FormatNode(node.Queries) } -// CancelSession represents a CANCEL SESSION statement. -type CancelSession struct { - ID Expr +// CancelSessions represents a CANCEL SESSIONS statement. +type CancelSessions struct { + Sessions *Select IfExists bool } // Format implements the NodeFormatter interface. -func (node *CancelSession) Format(ctx *FmtCtx) { - ctx.WriteString("CANCEL SESSION ") +func (node *CancelSessions) Format(ctx *FmtCtx) { + ctx.WriteString("CANCEL SESSIONS ") if node.IfExists { ctx.WriteString("IF EXISTS ") } - ctx.FormatNode(node.ID) + ctx.FormatNode(node.Sessions) } diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 6d7d8c206708..013f3d550d67 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -221,12 +221,12 @@ func (*CancelQueries) StatementTag() string { return "CANCEL QUERIES" } func (*CancelQueries) independentFromParallelizedPriors() {} // StatementType implements the Statement interface. -func (*CancelSession) StatementType() StatementType { return Ack } +func (*CancelSessions) StatementType() StatementType { return RowsAffected } // StatementTag returns a short string identifying the type of statement. -func (*CancelSession) StatementTag() string { return "CANCEL SESSION" } +func (*CancelSessions) StatementTag() string { return "CANCEL SESSIONS" } -func (*CancelSession) independentFromParallelizedPriors() {} +func (*CancelSessions) independentFromParallelizedPriors() {} // StatementType implements the Statement interface. func (*CommitTransaction) StatementType() StatementType { return Ack } @@ -890,7 +890,7 @@ func (n *Backup) String() string { return AsString(n) } func (n *BeginTransaction) String() string { return AsString(n) } func (n *CancelJob) String() string { return AsString(n) } func (n *CancelQueries) String() string { return AsString(n) } -func (n *CancelSession) String() string { return AsString(n) } +func (n *CancelSessions) String() string { return AsString(n) } func (n *CommitTransaction) String() string { return AsString(n) } func (n *CopyFrom) String() string { return AsString(n) } func (n *CreateChangefeed) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index 632d013ffc5a..d9a795267374 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -799,6 +799,22 @@ func (stmt *CancelQueries) WalkStmt(v Visitor) Statement { return stmt } +// CopyNode makes a copy of this Statement without recursing in any child Statements. +func (stmt *CancelSessions) CopyNode() *CancelSessions { + stmtCopy := *stmt + return &stmtCopy +} + +// WalkStmt is part of the WalkableStmt interface. +func (stmt *CancelSessions) WalkStmt(v Visitor) Statement { + sel, changed := WalkStmt(v, stmt.Sessions) + if changed { + stmt = stmt.CopyNode() + stmt.Sessions = sel.(*Select) + } + return stmt +} + // CopyNode makes a copy of this Statement without recursing in any child Statements. func (stmt *Import) CopyNode() *Import { stmtCopy := *stmt @@ -1178,6 +1194,7 @@ var _ WalkableStmt = &SetVar{} var _ WalkableStmt = &Update{} var _ WalkableStmt = &ValuesClause{} var _ WalkableStmt = &CancelQueries{} +var _ WalkableStmt = &CancelSessions{} // WalkStmt walks the entire parsed stmt calling WalkExpr on each // expression, and replacing each expression with the one returned diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index 533d9fed5420..fe7be8807dff 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -504,10 +504,8 @@ func (v *planVisitor) visit(plan planNode) { case *cancelQueriesNode: v.visit(n.rows) - case *cancelSessionNode: - if v.observer.expr != nil { - v.expr(name, "sessionID", -1, n.sessionID) - } + case *cancelSessionsNode: + v.visit(n.rows) case *controlJobNode: if v.observer.expr != nil { @@ -563,7 +561,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&alterSequenceNode{}): "alter sequence", reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user", reflect.TypeOf(&cancelQueriesNode{}): "cancel queries", - reflect.TypeOf(&cancelSessionNode{}): "cancel session", + reflect.TypeOf(&cancelSessionsNode{}): "cancel sessions", reflect.TypeOf(&controlJobNode{}): "control job", reflect.TypeOf(&createDatabaseNode{}): "create database", reflect.TypeOf(&createIndexNode{}): "create index", From 9f694b9b70323d3fcf8b541e86a252ccc0e1a2ba Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Sun, 29 Apr 2018 19:40:29 +0200 Subject: [PATCH 3/3] sql: make PAUSE/RESUME/CANCEL JOB (->JOBS) support multiple jobs Prior to this patch, `CANCEL/PAUSE/RESUME JOB` would accept a single expression as argument. This patch extends the syntax to accept an arbitrary number of job IDs, via a selection query. The syntax becomes `CANCEL/PAUSE/RESUME JOBS `. The syntax `CANCEL/PAUSE/RESUME JOB` with a single expression is preserved for backward compatibility, and is desugared to `CANCEL/PAUSE/RESUME JOBS VALUES (...)` during parsing. Commands are processed atomically. If a job's state cannot be updated, an error occurs and all the commands are cancelled (i.e. every job remains in its initial state). In addition, the statement now returns the number of job commands processed. Finally, the amount of boilerplate in the AST nodes is reduced, by using a single `ControlJobs` node type instead of separate `ResumeJob` / `CancelJob` / `PauseJob`. Release note (sql change): The `CANCEL/PAUSE/RESUME JOB` statements are extended with variants `CANCEL/PAUSE/RESUME JOBS` able to operate on multiple jobs at once. For example, to pause all running jobs run `PAUSE JOBS SELECT id FROM [SHOW JOBS] WHERE status = 'running'`. --- docs/generated/sql/bnf/stmt_block.bnf | 7 +- pkg/sql/cancel_queries.go | 7 +- pkg/sql/control_job.go | 117 ------------------ pkg/sql/control_jobs.go | 112 +++++++++++++++++ pkg/sql/expand_plan.go | 8 +- .../logictest/testdata/logic_test/run_control | 33 ++++- pkg/sql/opt_filters.go | 6 +- pkg/sql/opt_limits.go | 4 +- pkg/sql/opt_needed.go | 4 +- pkg/sql/parser/help_test.go | 7 +- pkg/sql/parser/parse_test.go | 21 ++-- pkg/sql/parser/sql.y | 82 ++++++++---- pkg/sql/pgwire/pgwire_test.go | 18 +++ pkg/sql/plan.go | 17 +-- pkg/sql/sem/tree/run_control.go | 45 ++++--- pkg/sql/sem/tree/stmt.go | 24 ++-- pkg/sql/sem/tree/walk.go | 17 +++ pkg/sql/walk.go | 8 +- 18 files changed, 315 insertions(+), 222 deletions(-) delete mode 100644 pkg/sql/control_job.go create mode 100644 pkg/sql/control_jobs.go diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index cf781525a97d..65cced778866 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -47,7 +47,7 @@ backup_stmt ::= 'BACKUP' targets 'TO' string_or_placeholder opt_as_of_clause opt_incremental opt_with_options cancel_stmt ::= - cancel_job_stmt + cancel_jobs_stmt | cancel_queries_stmt | cancel_sessions_stmt @@ -102,6 +102,7 @@ import_stmt ::= pause_stmt ::= 'PAUSE' 'JOB' a_expr + | 'PAUSE' 'JOBS' select_stmt prepare_stmt ::= 'PREPARE' table_alias_name prep_type_clause 'AS' preparable_stmt @@ -112,6 +113,7 @@ restore_stmt ::= resume_stmt ::= 'RESUME' 'JOB' a_expr + | 'RESUME' 'JOBS' select_stmt revoke_stmt ::= 'REVOKE' privileges 'ON' targets 'FROM' name_list @@ -217,8 +219,9 @@ opt_with_options ::= | 'WITH' 'OPTIONS' '(' kv_option_list ')' | -cancel_job_stmt ::= +cancel_jobs_stmt ::= 'CANCEL' 'JOB' a_expr + | 'CANCEL' 'JOBS' select_stmt cancel_queries_stmt ::= 'CANCEL' 'QUERY' a_expr diff --git a/pkg/sql/cancel_queries.go b/pkg/sql/cancel_queries.go index 044ad6b37963..6b1813ad626e 100644 --- a/pkg/sql/cancel_queries.go +++ b/pkg/sql/cancel_queries.go @@ -58,8 +58,13 @@ func (n *cancelQueriesNode) Next(params runParams) (bool, error) { return ok, err } + datum := n.rows.Values()[0] + if datum == tree.DNull { + return true, nil + } + statusServer := params.extendedEvalCtx.StatusServer - queryIDDatum := n.rows.Values()[0].(*tree.DString) + queryIDDatum := datum.(*tree.DString) queryIDString := string(*queryIDDatum) queryID, err := StringToClusterWideID(queryIDString) diff --git a/pkg/sql/control_job.go b/pkg/sql/control_job.go deleted file mode 100644 index 6405404fcbd9..000000000000 --- a/pkg/sql/control_job.go +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. See the License for the specific language governing -// permissions and limitations under the License. - -package sql - -import ( - "context" - "fmt" - - "github.com/cockroachdb/cockroach/pkg/sql/jobs" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sem/types" -) - -type controlJobNode struct { - jobID tree.TypedExpr - desiredStatus jobs.Status -} - -func (p *planner) PauseJob(ctx context.Context, n *tree.PauseJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "PAUSE JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusPaused, - }, nil -} - -func (p *planner) ResumeJob(ctx context.Context, n *tree.ResumeJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "RESUME JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusRunning, - }, nil -} - -func (p *planner) CancelJob(ctx context.Context, n *tree.CancelJob) (planNode, error) { - typedJobID, err := p.analyzeExpr( - ctx, - n.ID, - nil, - tree.IndexedVarHelper{}, - types.Int, - true, /* requireType */ - "CANCEL JOB", - ) - if err != nil { - return nil, err - } - - return &controlJobNode{ - jobID: typedJobID, - desiredStatus: jobs.StatusCanceled, - }, nil -} - -func (n *controlJobNode) startExec(params runParams) error { - jobIDDatum, err := n.jobID.Eval(params.EvalContext()) - if err != nil { - return err - } - - jobID, ok := tree.AsDInt(jobIDDatum) - if !ok { - return fmt.Errorf("%s is not a valid job ID", jobIDDatum) - } - - reg := params.p.ExecCfg().JobRegistry - switch n.desiredStatus { - case jobs.StatusPaused: - return reg.Pause(params.ctx, params.p.txn, int64(jobID)) - case jobs.StatusRunning: - return reg.Resume(params.ctx, params.p.txn, int64(jobID)) - case jobs.StatusCanceled: - return reg.Cancel(params.ctx, params.p.txn, int64(jobID)) - default: - panic("unreachable") - } -} - -func (n *controlJobNode) Next(runParams) (bool, error) { return false, nil } -func (*controlJobNode) Values() tree.Datums { return nil } -func (*controlJobNode) Close(context.Context) {} diff --git a/pkg/sql/control_jobs.go b/pkg/sql/control_jobs.go new file mode 100644 index 000000000000..2e613f0f8510 --- /dev/null +++ b/pkg/sql/control_jobs.go @@ -0,0 +1,112 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package sql + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/types" + "github.com/pkg/errors" +) + +type controlJobsNode struct { + rows planNode + desiredStatus jobs.Status + numRows int +} + +var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.Status{ + tree.CancelJob: jobs.StatusCanceled, + tree.ResumeJob: jobs.StatusRunning, + tree.PauseJob: jobs.StatusPaused, +} + +func (p *planner) ControlJobs(ctx context.Context, n *tree.ControlJobs) (planNode, error) { + rows, err := p.newPlan(ctx, n.Jobs, []types.T{types.Int}) + if err != nil { + return nil, err + } + cols := planColumns(rows) + if len(cols) != 1 { + return nil, errors.Errorf("%s JOBS expects a single column source, got %d columns", + tree.JobCommandToStatement[n.Command], len(cols)) + } + if !cols[0].Typ.Equivalent(types.Int) { + return nil, errors.Errorf("%s QUERIES requires int values, not type %s", + tree.JobCommandToStatement[n.Command], cols[0].Typ) + } + + return &controlJobsNode{ + rows: rows, + desiredStatus: jobCommandToDesiredStatus[n.Command], + }, nil +} + +// FastPathResults implements the planNodeFastPath inteface. +func (n *controlJobsNode) FastPathResults() (int, bool) { + return n.numRows, true +} + +// startExec implements the execStartable interface. +func (n *controlJobsNode) startExec(params runParams) error { + reg := params.p.ExecCfg().JobRegistry + for { + ok, err := n.rows.Next(params) + if err != nil { + return err + } + if !ok { + break + } + + datum := n.rows.Values()[0] + if datum == tree.DNull { + continue + } + + jobIDDatum := datum.(*tree.DInt) + jobID, ok := tree.AsDInt(jobIDDatum) + if !ok { + return fmt.Errorf("%s is not a valid job ID", jobIDDatum) + } + + switch n.desiredStatus { + case jobs.StatusPaused: + err = reg.Pause(params.ctx, params.p.txn, int64(jobID)) + case jobs.StatusRunning: + err = reg.Resume(params.ctx, params.p.txn, int64(jobID)) + case jobs.StatusCanceled: + err = reg.Cancel(params.ctx, params.p.txn, int64(jobID)) + default: + err = fmt.Errorf("programmer error: unhandled status %v", n.desiredStatus) + } + if err != nil { + return err + } + n.numRows++ + } + return nil +} + +func (*controlJobsNode) Next(runParams) (bool, error) { return false, nil } + +func (*controlJobsNode) Values() tree.Datums { return nil } + +func (n *controlJobsNode) Close(ctx context.Context) { + n.rows.Close(ctx) +} diff --git a/pkg/sql/expand_plan.go b/pkg/sql/expand_plan.go index f03c6fe55f77..c092f4eec71a 100644 --- a/pkg/sql/expand_plan.go +++ b/pkg/sql/expand_plan.go @@ -339,13 +339,15 @@ func doExpandPlan( case *cancelSessionsNode: n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *controlJobsNode: + n.rows, err = doExpandPlan(ctx, p, noParams, n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: @@ -822,13 +824,15 @@ func (p *planner) simplifyOrderings(plan planNode, usefulOrdering sqlbase.Column case *cancelSessionsNode: n.rows = p.simplifyOrderings(n.rows, nil) + case *controlJobsNode: + n.rows = p.simplifyOrderings(n.rows, nil) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index e365139e1738..2ba686c60bb5 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -6,27 +6,54 @@ PAUSE JOB 1 query error could not parse "foo" as type int PAUSE JOB 'foo' -query error NULL is not a valid job ID +query error could not parse "foo" as type int +CANCEL JOBS SELECT 'foo' + +query error CANCEL JOBS expects a single column source, got 2 columns +CANCEL JOBS VALUES (1,2) + +statement ok count 0 PAUSE JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +PAUSE JOBS SELECT id FROM system.jobs LIMIT 0 + +query error could not parse "foo" as type int +PAUSE JOBS SELECT 'foo' + +query error PAUSE JOBS expects a single column source, got 2 columns +PAUSE JOBS VALUES (1,2) + query error job with ID 1 does not exist RESUME JOB 1 query error could not parse "foo" as type int RESUME JOB 'foo' -query error NULL is not a valid job ID +query error could not parse "foo" as type int +RESUME JOBS SELECT 'foo' + +query error RESUME JOBS expects a single column source, got 2 columns +RESUME JOBS VALUES (1,2) + +statement ok count 0 RESUME JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +RESUME JOBS SELECT id FROM system.jobs LIMIT 0 + query error job with ID 1 does not exist CANCEL JOB 1 query error could not parse "foo" as type int CANCEL JOB 'foo' -query error NULL is not a valid job ID +statement ok count 0 CANCEL JOB (SELECT id FROM system.jobs LIMIT 0) +statement ok count 0 +CANCEL JOBS SELECT id FROM system.jobs LIMIT 0 + query error CANCEL QUERIES requires string values, not type int CANCEL QUERY 1 diff --git a/pkg/sql/opt_filters.go b/pkg/sql/opt_filters.go index 50602b726fca..ec4e54c05c41 100644 --- a/pkg/sql/opt_filters.go +++ b/pkg/sql/opt_filters.go @@ -349,12 +349,16 @@ func (p *planner) propagateFilters( return plan, extraFilter, err } + case *controlJobsNode: + if n.rows, err = p.triggerFilterPropagation(ctx, n.rows); err != nil { + return plan, extraFilter, err + } + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/opt_limits.go b/pkg/sql/opt_limits.go index 14d82ba6b2f3..2fa099a2d19d 100644 --- a/pkg/sql/opt_limits.go +++ b/pkg/sql/opt_limits.go @@ -193,13 +193,15 @@ func (p *planner) applyLimit(plan planNode, numRows int64, soft bool) { case *cancelSessionsNode: p.setUnlimited(n.rows) + case *controlJobsNode: + p.setUnlimited(n.rows) + case *valuesNode: case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: case *scrubNode: - case *controlJobNode: case *createDatabaseNode: case *createIndexNode: case *CreateUserNode: diff --git a/pkg/sql/opt_needed.go b/pkg/sql/opt_needed.go index bf7f4eae341b..e2bf42efcd19 100644 --- a/pkg/sql/opt_needed.go +++ b/pkg/sql/opt_needed.go @@ -209,11 +209,13 @@ func setNeededColumns(plan planNode, needed []bool) { case *cancelSessionsNode: setNeededColumns(n.rows, allColumns(n.rows)) + case *controlJobsNode: + setNeededColumns(n.rows, allColumns(n.rows)) + case *alterIndexNode: case *alterTableNode: case *alterSequenceNode: case *alterUserSetPasswordNode: - case *controlJobNode: case *scrubNode: case *createDatabaseNode: case *createIndexNode: diff --git a/pkg/sql/parser/help_test.go b/pkg/sql/parser/help_test.go index 9af2e319e218..869510e24bee 100644 --- a/pkg/sql/parser/help_test.go +++ b/pkg/sql/parser/help_test.go @@ -71,7 +71,8 @@ func TestContextualHelp(t *testing.T) { {`ALTER USER foo WITH PASSWORD ??`, `ALTER USER`}, {`CANCEL ??`, `CANCEL`}, - {`CANCEL JOB ??`, `CANCEL JOB`}, + {`CANCEL JOB ??`, `CANCEL JOBS`}, + {`CANCEL JOBS ??`, `CANCEL JOBS`}, {`CANCEL QUERY ??`, `CANCEL QUERIES`}, {`CANCEL QUERY IF ??`, `CANCEL QUERIES`}, {`CANCEL QUERY IF EXISTS ??`, `CANCEL QUERIES`}, @@ -203,9 +204,9 @@ func TestContextualHelp(t *testing.T) { {`GRANT ALL ON foo TO ??`, `GRANT`}, {`GRANT ALL ON foo TO bar ??`, `GRANT`}, - {`PAUSE ??`, `PAUSE JOB`}, + {`PAUSE ??`, `PAUSE JOBS`}, - {`RESUME ??`, `RESUME JOB`}, + {`RESUME ??`, `RESUME JOBS`}, {`REVOKE ALL ??`, `REVOKE`}, {`REVOKE ALL ON foo FROM ??`, `REVOKE`}, diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 6f26d0683488..9ecd5f3dbd42 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -310,13 +310,13 @@ func TestParse(t *testing.T) { {`DROP SEQUENCE a.b CASCADE`}, {`DROP SEQUENCE a, b CASCADE`}, - {`CANCEL JOB a`}, + {`CANCEL JOBS SELECT a`}, {`CANCEL QUERIES SELECT a`}, {`CANCEL SESSIONS SELECT a`}, {`CANCEL QUERIES IF EXISTS SELECT a`}, {`CANCEL SESSIONS IF EXISTS SELECT a`}, - {`RESUME JOB a`}, - {`PAUSE JOB a`}, + {`RESUME JOBS SELECT a`}, + {`PAUSE JOBS SELECT a`}, {`EXPLAIN SELECT 1`}, {`EXPLAIN EXPLAIN SELECT 1`}, @@ -423,12 +423,12 @@ func TestParse(t *testing.T) { {`PREPARE a (STRING) AS CANCEL SESSIONS SELECT $1`}, {`PREPARE a AS CANCEL SESSIONS IF EXISTS SELECT 1`}, {`PREPARE a (STRING) AS CANCEL SESSIONS IF EXISTS SELECT $1`}, - {`PREPARE a AS CANCEL JOB 1`}, - {`PREPARE a (INT) AS CANCEL JOB $1`}, - {`PREPARE a AS PAUSE JOB 1`}, - {`PREPARE a (INT) AS PAUSE JOB $1`}, - {`PREPARE a AS RESUME JOB 1`}, - {`PREPARE a (INT) AS RESUME JOB $1`}, + {`PREPARE a AS CANCEL JOBS SELECT 1`}, + {`PREPARE a (INT) AS CANCEL JOBS SELECT $1`}, + {`PREPARE a AS PAUSE JOBS SELECT 1`}, + {`PREPARE a (INT) AS PAUSE JOBS SELECT $1`}, + {`PREPARE a AS RESUME JOBS SELECT 1`}, + {`PREPARE a (INT) AS RESUME JOBS SELECT $1`}, {`PREPARE a AS IMPORT TABLE a CREATE USING 'b' CSV DATA ('c') WITH temp = 'd'`}, {`PREPARE a (STRING, STRING, STRING) AS IMPORT TABLE a CREATE USING $1 CSV DATA ($2) WITH temp = $3`}, @@ -1291,6 +1291,9 @@ func TestParse2(t *testing.T) { {`DEALLOCATE PREPARE ALL`, `DEALLOCATE ALL`}, + {`CANCEL JOB a`, `CANCEL JOBS VALUES (a)`}, + {`RESUME JOB a`, `RESUME JOBS VALUES (a)`}, + {`PAUSE JOB a`, `PAUSE JOBS VALUES (a)`}, {`CANCEL QUERY a`, `CANCEL QUERIES VALUES (a)`}, {`CANCEL QUERY IF EXISTS a`, `CANCEL QUERIES IF EXISTS VALUES (a)`}, {`CANCEL SESSION a`, `CANCEL SESSIONS VALUES (a)`}, diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 58d440b4c78e..a135a6556490 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -605,7 +605,7 @@ func newNameFromStr(s string) *tree.Name { %type begin_stmt %type cancel_stmt -%type cancel_job_stmt +%type cancel_jobs_stmt %type cancel_queries_stmt %type cancel_sessions_stmt @@ -1041,10 +1041,10 @@ stmt: | grant_stmt // EXTEND WITH HELP: GRANT | insert_stmt // EXTEND WITH HELP: INSERT | import_stmt // EXTEND WITH HELP: IMPORT -| pause_stmt // EXTEND WITH HELP: PAUSE JOB +| pause_stmt // EXTEND WITH HELP: PAUSE JOBS | prepare_stmt // EXTEND WITH HELP: PREPARE | restore_stmt // EXTEND WITH HELP: RESTORE -| resume_stmt // EXTEND WITH HELP: RESUME JOB +| resume_stmt // EXTEND WITH HELP: RESUME JOBS | revoke_stmt // EXTEND WITH HELP: REVOKE | savepoint_stmt // EXTEND WITH HELP: SAVEPOINT | scrub_stmt // help texts in sub-rule @@ -1705,23 +1705,35 @@ copy_from_stmt: // %Help: CANCEL // %Category: Group -// %Text: CANCEL JOB, CANCEL QUERIES, CANCEL SESSIONS +// %Text: CANCEL JOBS, CANCEL QUERIES, CANCEL SESSIONS cancel_stmt: - cancel_job_stmt // EXTEND WITH HELP: CANCEL JOB + cancel_jobs_stmt // EXTEND WITH HELP: CANCEL JOBS | cancel_queries_stmt // EXTEND WITH HELP: CANCEL QUERIES | cancel_sessions_stmt // EXTEND WITH HELP: CANCEL SESSIONS | CANCEL error // SHOW HELP: CANCEL -// %Help: CANCEL JOB - cancel a background job +// %Help: CANCEL JOBS - cancel background jobs // %Category: Misc -// %Text: CANCEL JOB -// %SeeAlso: SHOW JOBS, PAUSE JOBS, RESUME JOB -cancel_job_stmt: +// %Text: +// CANCEL JOBS +// CANCEL JOB +// %SeeAlso: SHOW JOBS, PAUSE JOBS, RESUME JOBS +cancel_jobs_stmt: CANCEL JOB a_expr { - $$.val = &tree.CancelJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.CancelJob, + } } -| CANCEL JOB error // SHOW HELP: CANCEL JOB +| CANCEL JOB error // SHOW HELP: CANCEL JOBS +| CANCEL JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.CancelJob} + } +| CANCEL JOBS error // SHOW HELP: CANCEL JOBS // %Help: CANCEL QUERIES - cancel running queries // %Category: Misc @@ -2093,10 +2105,10 @@ preparable_stmt: | drop_user_stmt // EXTEND WITH HELP: DROP USER | import_stmt // EXTEND WITH HELP: IMPORT | insert_stmt // EXTEND WITH HELP: INSERT -| pause_stmt // EXTEND WITH HELP: PAUSE JOB +| pause_stmt // EXTEND WITH HELP: PAUSE JOBS | reset_stmt // help texts in sub-rule | restore_stmt // EXTEND WITH HELP: RESTORE -| resume_stmt // EXTEND WITH HELP: RESUME JOB +| resume_stmt // EXTEND WITH HELP: RESUME JOBS | select_stmt // help texts in sub-rule { $$.val = $1.slct() @@ -2909,7 +2921,7 @@ show_queries_stmt: // %Help: SHOW JOBS - list background jobs // %Category: Misc // %Text: SHOW JOBS -// %SeeAlso: CANCEL JOB, PAUSE JOB, RESUME JOB +// %SeeAlso: CANCEL JOBS, PAUSE JOBS, RESUME JOBS show_jobs_stmt: SHOW JOBS { @@ -3383,16 +3395,27 @@ for_grantee_clause: $$.val = tree.NameList(nil) } -// %Help: PAUSE JOB - pause a background job +// %Help: PAUSE JOBS - pause background jobs // %Category: Misc -// %Text: PAUSE JOB -// %SeeAlso: SHOW JOBS, CANCEL JOB, RESUME JOB +// %Text: +// PAUSE JOBS +// PAUSE JOB +// %SeeAlso: SHOW JOBS, CANCEL JOBS, RESUME JOBS pause_stmt: PAUSE JOB a_expr { - $$.val = &tree.PauseJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.PauseJob, + } + } +| PAUSE JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.PauseJob} } -| PAUSE error // SHOW HELP: PAUSE JOB +| PAUSE error // SHOW HELP: PAUSE JOBS // %Help: CREATE TABLE - create a new table // %Category: DDL @@ -4281,16 +4304,27 @@ release_stmt: } | RELEASE error // SHOW HELP: RELEASE -// %Help: RESUME JOB - resume a background job +// %Help: RESUME JOBS - resume background jobs // %Category: Misc -// %Text: RESUME JOB -// %SeeAlso: SHOW JOBS, CANCEL JOB, PAUSE JOB +// %Text: +// RESUME JOBS +// RESUME JOB +// %SeeAlso: SHOW JOBS, CANCEL JOBS, PAUSE JOBS resume_stmt: RESUME JOB a_expr { - $$.val = &tree.ResumeJob{ID: $3.expr()} + $$.val = &tree.ControlJobs{ + Jobs: &tree.Select{ + Select: &tree.ValuesClause{Tuples: []*tree.Tuple{{Exprs: tree.Exprs{$3.expr()}}}}, + }, + Command: tree.ResumeJob, + } + } +| RESUME JOBS select_stmt + { + $$.val = &tree.ControlJobs{Jobs: $3.slct(), Command: tree.ResumeJob} } -| RESUME error // SHOW HELP: RESUME JOB +| RESUME error // SHOW HELP: RESUME JOBS // %Help: SAVEPOINT - start a retryable block // %Category: Txn diff --git a/pkg/sql/pgwire/pgwire_test.go b/pkg/sql/pgwire/pgwire_test.go index c06ab80c63d4..6ff95dd7aeef 100644 --- a/pkg/sql/pgwire/pgwire_test.go +++ b/pkg/sql/pgwire/pgwire_test.go @@ -1236,18 +1236,36 @@ func TestPGPreparedExec(t *testing.T) { baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "CANCEL JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "RESUME JOB $1", []preparedExecTest{ baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "RESUME JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "PAUSE JOB $1", []preparedExecTest{ baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), }, }, + { + "PAUSE JOBS SELECT $1", + []preparedExecTest{ + baseTest.SetArgs(123).Error("pq: job with ID 123 does not exist"), + }, + }, { "CANCEL QUERY $1", []preparedExecTest{ diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 442b8da44055..dddcb8780561 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -214,6 +214,7 @@ var _ planNodeFastPath = &deleteNode{} var _ planNodeFastPath = &rowCountNode{} var _ planNodeFastPath = &serializeNode{} var _ planNodeFastPath = &setZoneConfigNode{} +var _ planNodeFastPath = &controlJobsNode{} // planNodeRequireSpool serves as marker for nodes whose parent must // ensure that the node is fully run to completion (and the results @@ -641,8 +642,8 @@ func (p *planner) newPlan( return p.CancelQueries(ctx, n) case *tree.CancelSessions: return p.CancelSessions(ctx, n) - case *tree.CancelJob: - return p.CancelJob(ctx, n) + case *tree.ControlJobs: + return p.ControlJobs(ctx, n) case *tree.Scrub: return p.Scrub(ctx, n) case *tree.CreateDatabase: @@ -687,8 +688,6 @@ func (p *planner) newPlan( return p.Insert(ctx, n, desiredTypes) case *tree.ParenSelect: return p.newPlan(ctx, n.Select, desiredTypes) - case *tree.PauseJob: - return p.PauseJob(ctx, n) case *tree.TestingRelocate: return p.TestingRelocate(ctx, n) case *tree.RenameColumn: @@ -699,8 +698,6 @@ func (p *planner) newPlan( return p.RenameIndex(ctx, n) case *tree.RenameTable: return p.RenameTable(ctx, n) - case *tree.ResumeJob: - return p.ResumeJob(ctx, n) case *tree.Revoke: return p.Revoke(ctx, n) case *tree.Scatter: @@ -824,8 +821,8 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.CancelQueries(ctx, n) case *tree.CancelSessions: return p.CancelSessions(ctx, n) - case *tree.CancelJob: - return p.CancelJob(ctx, n) + case *tree.ControlJobs: + return p.ControlJobs(ctx, n) case *tree.CreateUser: return p.CreateUser(ctx, n) case *tree.CreateTable: @@ -838,10 +835,6 @@ func (p *planner) doPrepare(ctx context.Context, stmt tree.Statement) (planNode, return p.Explain(ctx, n) case *tree.Insert: return p.Insert(ctx, n, nil) - case *tree.PauseJob: - return p.PauseJob(ctx, n) - case *tree.ResumeJob: - return p.ResumeJob(ctx, n) case *tree.Select: return p.Select(ctx, n, nil) case *tree.SelectClause: diff --git a/pkg/sql/sem/tree/run_control.go b/pkg/sql/sem/tree/run_control.go index 0893d4c00cac..59088eeb588e 100644 --- a/pkg/sql/sem/tree/run_control.go +++ b/pkg/sql/sem/tree/run_control.go @@ -14,37 +14,34 @@ package tree -// PauseJob represents a PAUSE JOB statement. -type PauseJob struct { - ID Expr +// ControlJobs represents a PAUSE/RESUME/CANCEL JOBS statement. +type ControlJobs struct { + Jobs *Select + Command JobCommand } -// Format implements the NodeFormatter interface. -func (node *PauseJob) Format(ctx *FmtCtx) { - ctx.WriteString("PAUSE JOB ") - ctx.FormatNode(node.ID) -} - -// ResumeJob represents a RESUME JOB statement. -type ResumeJob struct { - ID Expr -} +// JobCommand determines which type of action to effect on the selected job(s). +type JobCommand int -// Format implements the NodeFormatter interface. -func (node *ResumeJob) Format(ctx *FmtCtx) { - ctx.WriteString("RESUME JOB ") - ctx.FormatNode(node.ID) -} +// JobCommand values +const ( + PauseJob JobCommand = iota + CancelJob + ResumeJob +) -// CancelJob represents a CANCEL JOB statement. -type CancelJob struct { - ID Expr +// JobCommandToStatement translates a job command integer to a statement prefix. +var JobCommandToStatement = map[JobCommand]string{ + PauseJob: "PAUSE", + CancelJob: "CANCEL", + ResumeJob: "RESUME", } // Format implements the NodeFormatter interface. -func (node *CancelJob) Format(ctx *FmtCtx) { - ctx.WriteString("CANCEL JOB ") - ctx.FormatNode(node.ID) +func (n *ControlJobs) Format(ctx *FmtCtx) { + ctx.WriteString(JobCommandToStatement[n.Command]) + ctx.WriteString(" JOBS ") + ctx.FormatNode(n.Jobs) } // CancelQueries represents a CANCEL QUERIES statement. diff --git a/pkg/sql/sem/tree/stmt.go b/pkg/sql/sem/tree/stmt.go index 013f3d550d67..823fb77d004d 100644 --- a/pkg/sql/sem/tree/stmt.go +++ b/pkg/sql/sem/tree/stmt.go @@ -207,10 +207,14 @@ func (*BeginTransaction) StatementTag() string { return "BEGIN" } func (*BeginTransaction) hiddenFromStats() {} // StatementType implements the Statement interface. -func (*CancelJob) StatementType() StatementType { return Ack } +func (*ControlJobs) StatementType() StatementType { return RowsAffected } // StatementTag returns a short string identifying the type of statement. -func (*CancelJob) StatementTag() string { return "CANCEL JOB" } +func (n *ControlJobs) StatementTag() string { + return fmt.Sprintf("%s JOBS", JobCommandToStatement[n.Command]) +} + +func (*ControlJobs) independentFromParallelizedPriors() {} // StatementType implements the Statement interface. func (*CancelQueries) StatementType() StatementType { return RowsAffected } @@ -435,12 +439,6 @@ func (*ParenSelect) StatementType() StatementType { return Rows } // StatementTag returns a short string identifying the type of statement. func (*ParenSelect) StatementTag() string { return "SELECT" } -// StatementType implements the Statement interface. -func (*PauseJob) StatementType() StatementType { return Ack } - -// StatementTag returns a short string identifying the type of statement. -func (*PauseJob) StatementTag() string { return "PAUSE JOB" } - // StatementType implements the Statement interface. func (*Prepare) StatementType() StatementType { return Ack } @@ -502,12 +500,6 @@ func (*Restore) StatementTag() string { return "RESTORE" } func (*Restore) hiddenFromShowQueries() {} -// StatementType implements the Statement interface. -func (*ResumeJob) StatementType() StatementType { return Ack } - -// StatementTag returns a short string identifying the type of statement. -func (*ResumeJob) StatementTag() string { return "RESUME JOB" } - // StatementType implements the Statement interface. func (*Revoke) StatementType() StatementType { return DDL } @@ -888,7 +880,7 @@ func (n *AlterUserSetPassword) String() string { return AsString(n) } func (n *AlterSequence) String() string { return AsString(n) } func (n *Backup) String() string { return AsString(n) } func (n *BeginTransaction) String() string { return AsString(n) } -func (n *CancelJob) String() string { return AsString(n) } +func (n *ControlJobs) String() string { return AsString(n) } func (n *CancelQueries) String() string { return AsString(n) } func (n *CancelSessions) String() string { return AsString(n) } func (n *CommitTransaction) String() string { return AsString(n) } @@ -919,7 +911,6 @@ func (n *GrantRole) String() string { return AsString(n) } func (n *Insert) String() string { return AsString(n) } func (n *Import) String() string { return AsString(n) } func (n *ParenSelect) String() string { return AsString(n) } -func (n *PauseJob) String() string { return AsString(n) } func (n *Prepare) String() string { return AsString(n) } func (n *ReleaseSavepoint) String() string { return AsString(n) } func (n *TestingRelocate) String() string { return AsString(n) } @@ -928,7 +919,6 @@ func (n *RenameDatabase) String() string { return AsString(n) } func (n *RenameIndex) String() string { return AsString(n) } func (n *RenameTable) String() string { return AsString(n) } func (n *Restore) String() string { return AsString(n) } -func (n *ResumeJob) String() string { return AsString(n) } func (n *Revoke) String() string { return AsString(n) } func (n *RevokeRole) String() string { return AsString(n) } func (n *RollbackToSavepoint) String() string { return AsString(n) } diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index d9a795267374..bedfb191a093 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -815,6 +815,22 @@ func (stmt *CancelSessions) WalkStmt(v Visitor) Statement { return stmt } +// CopyNode makes a copy of this Statement without recursing in any child Statements. +func (stmt *ControlJobs) CopyNode() *ControlJobs { + stmtCopy := *stmt + return &stmtCopy +} + +// WalkStmt is part of the WalkableStmt interface. +func (stmt *ControlJobs) WalkStmt(v Visitor) Statement { + sel, changed := WalkStmt(v, stmt.Jobs) + if changed { + stmt = stmt.CopyNode() + stmt.Jobs = sel.(*Select) + } + return stmt +} + // CopyNode makes a copy of this Statement without recursing in any child Statements. func (stmt *Import) CopyNode() *Import { stmtCopy := *stmt @@ -1195,6 +1211,7 @@ var _ WalkableStmt = &Update{} var _ WalkableStmt = &ValuesClause{} var _ WalkableStmt = &CancelQueries{} var _ WalkableStmt = &CancelSessions{} +var _ WalkableStmt = &ControlJobs{} // WalkStmt walks the entire parsed stmt calling WalkExpr on each // expression, and replacing each expression with the one returned diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index fe7be8807dff..5b92ca498641 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -507,10 +507,8 @@ func (v *planVisitor) visit(plan planNode) { case *cancelSessionsNode: v.visit(n.rows) - case *controlJobNode: - if v.observer.expr != nil { - v.expr(name, "jobID", -1, n.jobID) - } + case *controlJobsNode: + v.visit(n.rows) } } @@ -562,7 +560,7 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&alterUserSetPasswordNode{}): "alter user", reflect.TypeOf(&cancelQueriesNode{}): "cancel queries", reflect.TypeOf(&cancelSessionsNode{}): "cancel sessions", - reflect.TypeOf(&controlJobNode{}): "control job", + reflect.TypeOf(&controlJobsNode{}): "control jobs", reflect.TypeOf(&createDatabaseNode{}): "create database", reflect.TypeOf(&createIndexNode{}): "create index", reflect.TypeOf(&createTableNode{}): "create table",