Skip to content

Commit

Permalink
Merge #25157
Browse files Browse the repository at this point in the history
25157: sql: simplify+generalize control statements r=knz a=knz

The following statements gain the ability to operate on multiple entries at once:

- `CANCEL SESSIONS`
- `CANCEL QUERIES`
- `CANCEL JOBS`
- `RESUME JOBS`
- `PAUSE JOBS`

See individual commits for details.

Fixes #22907.

cc @cockroachdb/sql 

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed May 1, 2018
2 parents 1ebf55b + 9f694b9 commit f42d65f
Show file tree
Hide file tree
Showing 23 changed files with 681 additions and 373 deletions.
4 changes: 3 additions & 1 deletion docs/generated/sql/bnf/cancel_query.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cancel_query_stmt ::=
cancel_queries_stmt ::=
'CANCEL' 'QUERY' query_id
| 'CANCEL' 'QUERY' 'IF' 'EXISTS' query_id
| 'CANCEL' 'QUERIES' select_stmt
| 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt
5 changes: 5 additions & 0 deletions docs/generated/sql/bnf/cancel_session.bnf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
cancel_sessions_stmt ::=
'CANCEL' 'SESSION' session_id
| 'CANCEL' 'SESSION' 'IF' 'EXISTS' session_id
| 'CANCEL' 'SESSIONS' select_stmt
| 'CANCEL' 'SESSIONS' 'IF' 'EXISTS' select_stmt
19 changes: 13 additions & 6 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ backup_stmt ::=
'BACKUP' targets 'TO' string_or_placeholder opt_as_of_clause opt_incremental opt_with_options

cancel_stmt ::=
cancel_job_stmt
| cancel_query_stmt
| cancel_session_stmt
cancel_jobs_stmt
| cancel_queries_stmt
| cancel_sessions_stmt

copy_from_stmt ::=
'COPY' table_name opt_column_list 'FROM' 'STDIN'
Expand Down Expand Up @@ -102,6 +102,7 @@ import_stmt ::=

pause_stmt ::=
'PAUSE' 'JOB' a_expr
| 'PAUSE' 'JOBS' select_stmt

prepare_stmt ::=
'PREPARE' table_alias_name prep_type_clause 'AS' preparable_stmt
Expand All @@ -112,6 +113,7 @@ restore_stmt ::=

resume_stmt ::=
'RESUME' 'JOB' a_expr
| 'RESUME' 'JOBS' select_stmt

revoke_stmt ::=
'REVOKE' privileges 'ON' targets 'FROM' name_list
Expand Down Expand Up @@ -217,16 +219,21 @@ opt_with_options ::=
| 'WITH' 'OPTIONS' '(' kv_option_list ')'
|

cancel_job_stmt ::=
cancel_jobs_stmt ::=
'CANCEL' 'JOB' a_expr
| 'CANCEL' 'JOBS' select_stmt

cancel_query_stmt ::=
cancel_queries_stmt ::=
'CANCEL' 'QUERY' a_expr
| 'CANCEL' 'QUERY' 'IF' 'EXISTS' a_expr
| 'CANCEL' 'QUERIES' select_stmt
| 'CANCEL' 'QUERIES' 'IF' 'EXISTS' select_stmt

cancel_session_stmt ::=
cancel_sessions_stmt ::=
'CANCEL' 'SESSION' a_expr
| 'CANCEL' 'SESSION' 'IF' 'EXISTS' a_expr
| 'CANCEL' 'SESSIONS' select_stmt
| 'CANCEL' 'SESSIONS' 'IF' 'EXISTS' select_stmt

table_name ::=
db_object_name
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/docgen/diagrams.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,8 @@ var specs = []stmtSpec{
inline: []string{"opt_transaction"},
match: []*regexp.Regexp{regexp.MustCompile("'COMMIT'|'END'")},
},
{name: "cancel_query", stmt: "cancel_query_stmt", replace: map[string]string{"a_expr": "query_id"}, unlink: []string{"query_id"}},
{name: "cancel_query", stmt: "cancel_queries_stmt", replace: map[string]string{"a_expr": "query_id"}, unlink: []string{"query_id"}},
{name: "cancel_session", stmt: "cancel_sessions_stmt", replace: map[string]string{"a_expr": "session_id"}, unlink: []string{"session_id"}},
{name: "create_database_stmt", inline: []string{"opt_encoding_clause"}, replace: map[string]string{"'SCONST'": "encoding"}, unlink: []string{"name", "encoding"}},
{
name: "create_index_stmt",
Expand Down
64 changes: 37 additions & 27 deletions pkg/sql/cancel_query.go → pkg/sql/cancel_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,51 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
)

type cancelQueryNode struct {
queryID tree.TypedExpr
type cancelQueriesNode struct {
rows planNode
ifExists bool
}

func (p *planner) CancelQuery(ctx context.Context, n *tree.CancelQuery) (planNode, error) {
typedQueryID, err := p.analyzeExpr(
ctx,
n.ID,
nil,
tree.IndexedVarHelper{},
types.String,
true, /* requireType */
"CANCEL QUERY",
)
func (p *planner) CancelQueries(ctx context.Context, n *tree.CancelQueries) (planNode, error) {
rows, err := p.newPlan(ctx, n.Queries, []types.T{types.String})
if err != nil {
return nil, err
}
cols := planColumns(rows)
if len(cols) != 1 {
return nil, errors.Errorf("CANCEL QUERIES expects a single column source, got %d columns", len(cols))
}
if !cols[0].Typ.Equivalent(types.String) {
return nil, errors.Errorf("CANCEL QUERIES requires string values, not type %s", cols[0].Typ)
}

return &cancelQueryNode{
queryID: typedQueryID,
return &cancelQueriesNode{
rows: rows,
ifExists: n.IfExists,
}, nil
}

func (n *cancelQueryNode) startExec(params runParams) error {
statusServer := params.extendedEvalCtx.StatusServer
func (n *cancelQueriesNode) Next(params runParams) (bool, error) {
// TODO(knz): instead of performing the cancels sequentially,
// accumulate all the query IDs and then send batches to each of the
// nodes.

queryIDDatum, err := n.queryID.Eval(params.EvalContext())
if err != nil {
return err
if ok, err := n.rows.Next(params); err != nil || !ok {
return ok, err
}

datum := n.rows.Values()[0]
if datum == tree.DNull {
return true, nil
}

queryIDString := tree.AsStringWithFlags(queryIDDatum, tree.FmtBareStrings)
statusServer := params.extendedEvalCtx.StatusServer
queryIDDatum := datum.(*tree.DString)

queryIDString := string(*queryIDDatum)
queryID, err := StringToClusterWideID(queryIDString)
if err != nil {
return errors.Wrapf(err, "invalid query ID '%s'", queryIDString)
return false, errors.Wrapf(err, "invalid query ID '%s'", queryIDString)
}

// Get the lowest 32 bits of the query ID.
Expand All @@ -75,16 +83,18 @@ func (n *cancelQueryNode) startExec(params runParams) error {

response, err := statusServer.CancelQuery(params.ctx, request)
if err != nil {
return err
return false, err
}

if !response.Canceled && !n.ifExists {
return fmt.Errorf("could not cancel query %s: %s", queryID, response.Error)
return false, fmt.Errorf("could not cancel query %s: %s", queryID, response.Error)
}

return nil
return true, nil
}

func (n *cancelQueryNode) Next(runParams) (bool, error) { return false, nil }
func (*cancelQueryNode) Values() tree.Datums { return nil }
func (*cancelQueryNode) Close(context.Context) {}
func (*cancelQueriesNode) Values() tree.Datums { return nil }

func (n *cancelQueriesNode) Close(ctx context.Context) {
n.rows.Close(ctx)
}
68 changes: 39 additions & 29 deletions pkg/sql/cancel_session.go → pkg/sql/cancel_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,43 +24,51 @@ import (
"github.com/pkg/errors"
)

type cancelSessionNode struct {
sessionID tree.TypedExpr
ifExists bool
type cancelSessionsNode struct {
rows planNode
ifExists bool
}

func (p *planner) CancelSession(ctx context.Context, n *tree.CancelSession) (planNode, error) {
typedSessionID, err := p.analyzeExpr(
ctx,
n.ID,
nil,
tree.IndexedVarHelper{},
types.String,
true, /* requireType */
"CANCEL SESSION",
)
func (p *planner) CancelSessions(ctx context.Context, n *tree.CancelSessions) (planNode, error) {
rows, err := p.newPlan(ctx, n.Sessions, []types.T{types.String})
if err != nil {
return nil, err
}
cols := planColumns(rows)
if len(cols) != 1 {
return nil, errors.Errorf("CANCEL SESSIONS expects a single column source, got %d columns", len(cols))
}
if !cols[0].Typ.Equivalent(types.String) {
return nil, errors.Errorf("CANCEL SESSIONS requires string values, not type %s", cols[0].Typ)
}

return &cancelSessionNode{
sessionID: typedSessionID,
ifExists: n.IfExists,
return &cancelSessionsNode{
rows: rows,
ifExists: n.IfExists,
}, nil
}

func (n *cancelSessionNode) startExec(params runParams) error {
statusServer := params.extendedEvalCtx.StatusServer
func (n *cancelSessionsNode) Next(params runParams) (bool, error) {
// TODO(knz): instead of performing the cancels sequentially,
// accumulate all the query IDs and then send batches to each of the
// nodes.

sessionIDDatum, err := n.sessionID.Eval(params.EvalContext())
if err != nil {
return err
if ok, err := n.rows.Next(params); err != nil || !ok {
return ok, err
}

datum := n.rows.Values()[0]
if datum == tree.DNull {
return true, nil
}

sessionIDString := tree.AsStringWithFlags(sessionIDDatum, tree.FmtBareStrings)
statusServer := params.extendedEvalCtx.StatusServer
sessionIDDatum := datum.(*tree.DString)

sessionIDString := string(*sessionIDDatum)
sessionID, err := StringToClusterWideID(sessionIDString)
if err != nil {
return errors.Wrapf(err, "invalid session ID '%s'", sessionIDString)
return false, errors.Wrapf(err, "invalid session ID '%s'", sessionIDString)
}

// Get the lowest 32 bits of the session ID.
Expand All @@ -74,16 +82,18 @@ func (n *cancelSessionNode) startExec(params runParams) error {

response, err := statusServer.CancelSession(params.ctx, request)
if err != nil {
return err
return false, err
}

if !response.Canceled && !n.ifExists {
return fmt.Errorf("could not cancel session %s: %s", sessionID, response.Error)
return false, fmt.Errorf("could not cancel session %s: %s", sessionID, response.Error)
}

return nil
return true, nil
}

func (n *cancelSessionNode) Next(runParams) (bool, error) { return false, nil }
func (*cancelSessionNode) Values() tree.Datums { return nil }
func (*cancelSessionNode) Close(context.Context) {}
func (*cancelSessionsNode) Values() tree.Datums { return nil }

func (n *cancelSessionsNode) Close(ctx context.Context) {
n.rows.Close(ctx)
}
117 changes: 0 additions & 117 deletions pkg/sql/control_job.go

This file was deleted.

Loading

0 comments on commit f42d65f

Please sign in to comment.