diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 4f349ece15b7..09f4094e9cd5 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1734,42 +1734,48 @@ func (ex *connExecutor) execWithDistSQLEngine( } defer recv.Release() - evalCtx := planner.ExtendedEvalContext() - planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, - planner.txn, distribute) - planCtx.stmtType = recv.stmtType - // Skip the diagram generation since on this "main" query path we can get it - // via the statement bundle. - planCtx.skipDistSQLDiagramGeneration = true - if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { - planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) - } else if planner.instrumentation.ShouldSaveFlows() { - planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) - } - planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn() - planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() - - var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext - if len(planner.curPlan.subqueryPlans) != 0 || - len(planner.curPlan.cascades) != 0 || - len(planner.curPlan.checkPlans) != 0 { - var serialEvalCtx extendedEvalContext - ex.initEvalCtx(ctx, &serialEvalCtx, planner) - evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext { - // Reuse the same object if this factory is not used concurrently. - factoryEvalCtx := &serialEvalCtx - if usedConcurrently { - factoryEvalCtx = &extendedEvalContext{} - ex.initEvalCtx(ctx, factoryEvalCtx, planner) + var err error + + if planner.hasFlowForPausablePortal() { + err = planner.resumeFlowForPausablePortal(recv) + } else { + evalCtx := planner.ExtendedEvalContext() + planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner, + planner.txn, distribute) + planCtx.stmtType = recv.stmtType + // Skip the diagram generation since on this "main" query path we can get it + // via the statement bundle. + planCtx.skipDistSQLDiagramGeneration = true + if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil { + planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL) + } else if planner.instrumentation.ShouldSaveFlows() { + planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery) + } + planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn() + planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats() + + var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext + if len(planner.curPlan.subqueryPlans) != 0 || + len(planner.curPlan.cascades) != 0 || + len(planner.curPlan.checkPlans) != 0 { + var serialEvalCtx extendedEvalContext + ex.initEvalCtx(ctx, &serialEvalCtx, planner) + evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext { + // Reuse the same object if this factory is not used concurrently. + factoryEvalCtx := &serialEvalCtx + if usedConcurrently { + factoryEvalCtx = &extendedEvalContext{} + ex.initEvalCtx(ctx, factoryEvalCtx, planner) + } + ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) + factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders + factoryEvalCtx.Annotations = &planner.semaCtx.Annotations + factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID + return factoryEvalCtx } - ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) - factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders - factoryEvalCtx.Annotations = &planner.semaCtx.Annotations - factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID - return factoryEvalCtx } + err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) } - err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory) return recv.stats, err } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 3dab75fe8129..656b9056875c 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -883,6 +883,15 @@ func (p *PlanningCtx) IsLocal() bool { return p.isLocal } +// getPortalPauseInfo returns the portal pause info if the current planner is +// for a pausable portal. Otherwise, returns nil. +func (p *PlanningCtx) getPortalPauseInfo() *portalPauseInfo { + if p.planner != nil && p.planner.pausablePortal != nil && p.planner.pausablePortal.pauseInfo != nil { + return p.planner.pausablePortal.pauseInfo + } + return nil +} + // getDefaultSaveFlowsFunc returns the default function used to save physical // plans and their diagrams. The returned function is **not** concurrency-safe. func (p *PlanningCtx) getDefaultSaveFlowsFunc( diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index a92e9e338a26..5e590e3b8bf0 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -833,14 +833,31 @@ func (dsp *DistSQLPlanner) Run( if planCtx.planner != nil { statementSQL = planCtx.planner.stmt.StmtNoConstants } - ctx, flow, err := dsp.setupFlows( - ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, - ) - // Make sure that the local flow is always cleaned up if it was created. + + var flow flowinfra.Flow + var err error + if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil { + flow = i.flow + } else { + ctx, flow, err = dsp.setupFlows( + ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, + ) + if i != nil { + i.flow = flow + i.outputTypes = plan.GetResultTypes() + } + } + if flow != nil { - defer func() { - flow.Cleanup(ctx) - }() + // Make sure that the local flow is always cleaned up if it was created. + // If the flow is not for retained portal, we clean the flow up here. + // Otherwise, we delay the clean up via portalPauseInfo.flowCleanup until + // the portal is closed. + if planCtx.getPortalPauseInfo() == nil { + defer func() { + flow.Cleanup(ctx) + }() + } } if err != nil { recv.SetError(err) @@ -863,7 +880,8 @@ func (dsp *DistSQLPlanner) Run( return } - flow.Run(ctx, false /* noWait */) + noWait := planCtx.getPortalPauseInfo() != nil + flow.Run(ctx, noWait) } // DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 036f8249dcda..a41eb6081a21 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" "github.com/lib/pq/oid" @@ -201,6 +202,9 @@ type planner struct { // home region is being enforced. StmtNoConstantsWithHomeRegionEnforced string + // pausablePortal is set when the query is from a pausable portal. + pausablePortal *PreparedPortal + instrumentation instrumentationHelper // Contexts for different stages of planning and execution. @@ -260,6 +264,24 @@ type planner struct { evalCatalogBuiltins evalcatalog.Builtins } +// hasFlowForPausablePortal returns true if the planner is for re-executing a +// portal. We reuse the flow stored in p.pausablePortal.pauseInfo. +func (p *planner) hasFlowForPausablePortal() bool { + return p.pausablePortal != nil && p.pausablePortal.pauseInfo != nil && p.pausablePortal.pauseInfo.flow != nil +} + +// resumeFlowForPausablePortal is called when re-executing a portal. We reuse +// the flow with a new receiver, without re-generating the physical plan. +func (p *planner) resumeFlowForPausablePortal(recv *DistSQLReceiver) error { + if !p.hasFlowForPausablePortal() { + return errors.AssertionFailedf("no flow found for pausable portal") + } + recv.discardRows = p.instrumentation.ShouldDiscardRows() + recv.outputTypes = p.pausablePortal.pauseInfo.outputTypes + p.pausablePortal.pauseInfo.flow.Resume(recv) + return recv.commErr +} + func (evalCtx *extendedEvalContext) setSessionID(sessionID clusterunique.ID) { evalCtx.SessionID = sessionID } @@ -848,6 +870,7 @@ func (p *planner) resetPlanner( p.evalCatalogBuiltins.Init(p.execCfg.Codec, txn, p.Descriptors()) p.skipDescriptorCache = false p.typeResolutionDbID = descpb.InvalidID + p.pausablePortal = nil } // GetReplicationStreamManager returns a ReplicationStreamManager. diff --git a/pkg/sql/prepared_stmt.go b/pkg/sql/prepared_stmt.go index 5ad4bbd872c6..7a0c67cc051e 100644 --- a/pkg/sql/prepared_stmt.go +++ b/pkg/sql/prepared_stmt.go @@ -15,10 +15,12 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirebase" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -155,6 +157,9 @@ type PreparedPortal struct { // a portal. // See comments for PortalPausablity for more details. portalPausablity PortalPausablity + + // pauseInfo is the saved info needed for a pausable portal. + pauseInfo *portalPauseInfo } // makePreparedPortal creates a new PreparedPortal. @@ -173,10 +178,11 @@ func (ex *connExecutor) makePreparedPortal( Qargs: qargs, OutFormats: outFormats, } - // TODO(janexing): we added this line to avoid the unused lint error. - // Will remove it once the whole functionality of multple active portals - // is merged. - _ = enableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) + + if EnableMultipleActivePortals.Get(&ex.server.cfg.Settings.SV) { + portal.pauseInfo = &portalPauseInfo{} + portal.portalPausablity = PausablePortal + } return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name) } @@ -204,3 +210,17 @@ func (p *PreparedPortal) close( func (p *PreparedPortal) size(portalName string) int64 { return int64(uintptr(len(portalName)) + unsafe.Sizeof(p)) } + +// portalPauseInfo stores info that enables the pause of a portal. After pausing +// the portal, execute any other statement, and come back to re-execute it or +// close it. +type portalPauseInfo struct { + // outputTypes are the types of the result columns produced by the physical plan. + // We need this as when re-executing the portal, we are reusing the flow + // with the new receiver, but not re-generating the physical plan. + outputTypes []*types.T + // We need to store the flow for a portal so that when re-executing it, we + // continue from the previous execution. It lives along with the portal, and + // will be cleaned-up when the portal is closed. + flow flowinfra.Flow +}