Skip to content

Commit

Permalink
sql: reuse flow for pausable portal
Browse files Browse the repository at this point in the history
To execute portals in an interleaving manner, we need to persist the flow and
queryID so that we can _continue_ fetching the result when we come back to the same
portal.

We now introduce `pauseInfo` field in `sql.PreparedPortal` that stores this
metadata. It's set during the first-time execution of an engine, and all
following executions will reuse the flow and the queryID. This also implies that
these resources should not be cleaned up with the end of each execution.
Implementation for the clean-up steps is included in the next commit.

Also, in this commit we hang a `*PreparedPortal` to the planner, and how it is
set can be seen in the next commit as well.

Release note: None
  • Loading branch information
ZhouXing19 committed Mar 22, 2023
1 parent 2a25067 commit a79adba
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 45 deletions.
72 changes: 39 additions & 33 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,42 +1734,48 @@ func (ex *connExecutor) execWithDistSQLEngine(
}
defer recv.Release()

evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
planCtx.skipDistSQLDiagramGeneration = true
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
var err error

if planner.hasFlowForPausablePortal() {
err = planner.resumeFlowForPausablePortal(recv)
} else {
evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
planCtx.skipDistSQLDiagramGeneration = true
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
}
err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
return recv.stats, err
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ func (p *PlanningCtx) IsLocal() bool {
return p.isLocal
}

// getPortalPauseInfo returns the portal pause info if the current planner is
// for a pausable portal. Otherwise, returns nil.
func (p *PlanningCtx) getPortalPauseInfo() *portalPauseInfo {
if p.planner != nil && p.planner.pausablePortal != nil && p.planner.pausablePortal.pauseInfo != nil {
return p.planner.pausablePortal.pauseInfo
}
return nil
}

// getDefaultSaveFlowsFunc returns the default function used to save physical
// plans and their diagrams. The returned function is **not** concurrency-safe.
func (p *PlanningCtx) getDefaultSaveFlowsFunc(
Expand Down
34 changes: 26 additions & 8 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,14 +833,31 @@ func (dsp *DistSQLPlanner) Run(
if planCtx.planner != nil {
statementSQL = planCtx.planner.stmt.StmtNoConstants
}
ctx, flow, err := dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
// Make sure that the local flow is always cleaned up if it was created.

var flow flowinfra.Flow
var err error
if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil {
flow = i.flow
} else {
ctx, flow, err = dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
if i != nil {
i.flow = flow
i.outputTypes = plan.GetResultTypes()
}
}

if flow != nil {
defer func() {
flow.Cleanup(ctx)
}()
// Make sure that the local flow is always cleaned up if it was created.
// If the flow is not for retained portal, we clean the flow up here.
// Otherwise, we delay the clean up via portalPauseInfo.flowCleanup until
// the portal is closed.
if planCtx.getPortalPauseInfo() == nil {
defer func() {
flow.Cleanup(ctx)
}()
}
}
if err != nil {
recv.SetError(err)
Expand All @@ -863,7 +880,8 @@ func (dsp *DistSQLPlanner) Run(
return
}

flow.Run(ctx, false /* noWait */)
noWait := planCtx.getPortalPauseInfo() != nil
flow.Run(ctx, noWait)
}

// DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that
Expand Down
23 changes: 23 additions & 0 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"github.com/lib/pq/oid"
Expand Down Expand Up @@ -201,6 +202,9 @@ type planner struct {
// home region is being enforced.
StmtNoConstantsWithHomeRegionEnforced string

// pausablePortal is set when the query is from a pausable portal.
pausablePortal *PreparedPortal

instrumentation instrumentationHelper

// Contexts for different stages of planning and execution.
Expand Down Expand Up @@ -260,6 +264,24 @@ type planner struct {
evalCatalogBuiltins evalcatalog.Builtins
}

// hasFlowForPausablePortal returns true if the planner is for re-executing a
// portal. We reuse the flow stored in p.pausablePortal.pauseInfo.
func (p *planner) hasFlowForPausablePortal() bool {
return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil
}

// resumeFlowForPausablePortal is called when re-executing a portal. We reuse
// the flow with a new receiver, without re-generating the physical plan.
func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error {
if !p.hasFlowForPausablePortal() {
return errors.AssertionFailedf("no flow found for pausable portal")
}
recv.discardRows = p.instrumentation.ShouldDiscardRows()
recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes
p.pausablePortal.pauseInfo.flow.Resume(recv)
return recv.commErr
}

func (evalCtx *extendedEvalContext) setSessionID(sessionID clusterunique.ID) {
evalCtx.SessionID = sessionID
}
Expand Down Expand Up @@ -848,6 +870,7 @@ func (p *planner) resetPlanner(
p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors())
p.skipDescriptorCache = false
p.typeResolutionDbID = descpb.InvalidID
p.pausablePortal = nil
}

// GetReplicationStreamManager returns a ReplicationStreamManager.
Expand Down
28 changes: 24 additions & 4 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase"
"github.com/cockroachdb/cockroach/pkg/sql/querycache"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)
Expand Down Expand Up @@ -155,6 +157,9 @@ type PreparedPortal struct {
// a portal.
// See comments for PortalPausablity for more details.
portalPausablity PortalPausablity

// pauseInfo is the saved info needed for a pausable portal.
pauseInfo *portalPauseInfo
}

// makePreparedPortal creates a new PreparedPortal.
Expand All @@ -173,10 +178,11 @@ func (ex *connExecutor) makePreparedPortal(
Qargs: qargs,
OutFormats: outFormats,
}
// TODO(janexing): we added this line to avoid the unused lint error.
// Will remove it once the whole functionality of multple active portals
// is merged.
_ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV)

if EnableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) {
portal.pauseInfo = &portalPauseInfo{}
portal.portalPausablity = PausablePortal
}
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}

Expand Down Expand Up @@ -204,3 +210,17 @@ func (p *PreparedPortal) close(
func (p *PreparedPortal) size(portalName string) int64 {
return int64(uintptr(len(portalName)) + unsafe.Sizeof(p))
}

// portalPauseInfo stores info that enables the pause of a portal. After pausing
// the portal, execute any other statement, and come back to re-execute it or
// close it.
type portalPauseInfo struct {
// outputTypes are the types of the result columns produced by the physical plan.
// We need this as when re-executing the portal, we are reusing the flow
// with the new receiver, but not re-generating the physical plan.
outputTypes []*types.T
// We need to store the flow for a portal so that when re-executing it, we
// continue from the previous execution. It lives along with the portal, and
// will be cleaned-up when the portal is closed.
flow flowinfra.Flow
}

0 comments on commit a79adba

Please sign in to comment.