Skip to content

Commit

Permalink
sql: add restrictions for pausable portals
Browse files Browse the repository at this point in the history
This commits add the following restrictions for pausable portals:

1. Not an internal queries
2. Read-only queries
3. No sub-quereis or post-queries
4. Local plan only

This is because the current changes to the consumer-receiver model only consider
the local push-based case.

Release note: None
  • Loading branch information
ZhouXing19 committed Mar 15, 2023
1 parent a2bd230 commit 4197e9b
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 7 deletions.
18 changes: 17 additions & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,9 +1431,15 @@ func (ex *connExecutor) dispatchToExecutionEngine(
}

ex.sessionTracing.TracePlanCheckStart(ctx)

distSQLMode := ex.sessionData().DistSQLMode
// We only allow non-distributed plan for pausable portals.
if planner.portal != nil {
distSQLMode = sessiondatapb.DistSQLOff
}
distributePlan := getPlanDistribution(
ctx, planner.Descriptors().HasUncommittedTypes(),
ex.sessionData().DistSQLMode, planner.curPlan.main,
distSQLMode, planner.curPlan.main,
)
ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute())

Expand Down Expand Up @@ -1918,6 +1924,16 @@ func (ex *connExecutor) execWithDistSQLEngine(
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
// We don't sub / post queries for pausable portal. Set it back to an
// un-pausable (normal) portal.
if planCtx.getPortalPauseInfo() != nil {
// With pauseInfo is nil, no cleanup function will be added to the stack
// and all clean-up steps will be performed as for normal portals.
planCtx.planner.portal.pauseInfo = nil
// We need this so that the result consumption for this portal cannot be
// paused either.
res.UnsetForPausablePortal()
}
}
err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (ex *connExecutor) execBind(
}

// Create the new PreparedPortal.
if err := ex.addPortal(ctx, portalName, ps, qargs, columnFormatCodes); err != nil {
if err := ex.addPortal(ctx, portalName, ps, qargs, bindCmd.isInternal, columnFormatCodes); err != nil {
return retErr(err)
}

Expand All @@ -493,16 +493,17 @@ func (ex *connExecutor) addPortal(
portalName string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
isInternal bool,
outFormats []pgwirebase.FormatCode,
) error {
if _, ok := ex.extraTxnState.prepStmtsNamespace.portals[portalName]; ok {
panic(errors.AssertionFailedf("portal already exists: %q", portalName))
return nil
}
if cursor := ex.getCursorAccessor().getCursor(tree.Name(portalName)); cursor != nil {
panic(errors.AssertionFailedf("portal already exists as cursor: %q", portalName))
return nil
}

portal, err := ex.makePreparedPortal(ctx, portalName, stmt, qargs, outFormats)
portal, err := ex.makePreparedPortal(ctx, portalName, stmt, qargs, isInternal, outFormats)
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ type BindStmt struct {
// inferred types should reflect that).
// If internalArgs is specified, Args and ArgFormatCodes are ignored.
internalArgs []tree.Datum

// isInternal is set ture when the bind stmt is from an internal executor.
isInternal bool
}

// command implements the Command interface.
Expand Down Expand Up @@ -811,6 +814,10 @@ type RestrictedCommandResult interface {
// GetBulkJobId returns the id of the job for the query, if the query is
// IMPORT, BACKUP or RESTORE.
GetBulkJobId() uint64

// UnsetForPausablePortal is to set the forPausablePortal field to false for
// pgwire.limitedCommandResult, so that the portal becomes un-pausable.
UnsetForPausablePortal()
}

// DescribeResult represents the result of a Describe command (for either
Expand Down Expand Up @@ -965,6 +972,10 @@ type streamingCommandResult struct {
var _ RestrictedCommandResult = &streamingCommandResult{}
var _ CommandResultClose = &streamingCommandResult{}

// UnsetForPausablePortal is part of the sql.RestrictedCommandResult interface.
func (r *streamingCommandResult) UnsetForPausablePortal() {
}

// SetColumns is part of the RestrictedCommandResult interface.
func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.ResultColumns) {
// The interface allows for cols to be nil, yet the iterator result expects
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ func (ie *InternalExecutor) execInternal(
return nil, err
}

if err := stmtBuf.Push(ctx, BindStmt{internalArgs: datums}); err != nil {
if err := stmtBuf.Push(ctx, BindStmt{internalArgs: datums, isInternal: true}); err != nil {
return nil, err
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type paramStatusUpdate struct {

var _ sql.CommandResult = &commandResult{}

// UnsetForPausablePortal is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) UnsetForPausablePortal() {}

// Close is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) Close(ctx context.Context, t sql.TransactionStatusIndicator) {
r.assertNotReleased()
Expand Down Expand Up @@ -452,6 +455,8 @@ type limitedCommandResult struct {
forPausablePortal bool
}

var _ sql.RestrictedCommandResult = &limitedCommandResult{}

// AddRow is part of the sql.RestrictedCommandResult interface.
func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
if err := r.commandResult.AddRow(ctx, row); err != nil {
Expand Down Expand Up @@ -481,6 +486,11 @@ func (r *limitedCommandResult) AddRow(ctx context.Context, row tree.Datums) erro
return nil
}

// UnsetForPausablePortal is part of the sql.RestrictedCommandResult interface.
func (r *limitedCommandResult) UnsetForPausablePortal() {
r.forPausablePortal = false
}

// SupportsAddBatch is part of the sql.RestrictedCommandResult interface.
// TODO(yuzefovich): implement limiting behavior for AddBatch.
func (r *limitedCommandResult) SupportsAddBatch() bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (ex *connExecutor) makePreparedPortal(
name string,
stmt *PreparedStatement,
qargs tree.QueryArguments,
isInternal bool,
outFormats []pgwirebase.FormatCode,
) (PreparedPortal, error) {
portal := PreparedPortal{
Expand All @@ -162,7 +163,7 @@ func (ex *connExecutor) makePreparedPortal(
// TODO(janexing): maybe we should also add telemetry for the stmt that the
// portal hooks on.
telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)
if tree.IsReadOnly(stmt.AST) {
if tree.IsReadOnly(stmt.AST) && !isInternal {
portal.pauseInfo = &portalPauseInfo{}
}
}
Expand Down

0 comments on commit 4197e9b

Please sign in to comment.