Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql: enable resumption of a flow for pausable portals #99237

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ go_library(
"//pkg/sql/sqlstats/insights",
"//pkg/sql/sqlstats/persistedsqlstats",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/sql/sqltelemetry",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
"//pkg/sql/syntheticprivilege",
Expand Down
8 changes: 8 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -103,6 +104,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
Expand Down Expand Up @@ -1342,6 +1344,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
vmoduleSetting.SetOnChange(&cfg.Settings.SV, fn)
fn(ctx)

sql.EnableMultipleActivePortals.SetOnChange(&cfg.Settings.SV, func(ctx context.Context) {
if sql.EnableMultipleActivePortals.Get(&cfg.Settings.SV) {
telemetry.Inc(sqltelemetry.MultipleActivePortalCounter)
}
})

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,11 @@ func (ibm *IndexBackfillMerger) shrinkBoundAccount(ctx context.Context, shrinkBy
ibm.muBoundAccount.boundAccount.Shrink(ctx, shrinkBy)
}

// Resume is part of the execinfra.Processor interface.
func (ibm *IndexBackfillMerger) Resume(output execinfra.RowReceiver) {
panic("not implemented")
}

// NewIndexBackfillMerger creates a new IndexBackfillMerger.
func NewIndexBackfillMerger(
ctx context.Context, flowCtx *execinfra.FlowCtx, spec execinfrapb.IndexBackfillMergerSpec,
Expand Down
23 changes: 20 additions & 3 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,33 @@ func (f *vectorizedFlow) Setup(
return ctx, opChains, nil
}

// Resume is part of the Flow interface.
func (f *vectorizedFlow) Resume(recv execinfra.RowReceiver) {
if f.batchFlowCoordinator != nil {
recv.Push(
nil, /* row */
&execinfrapb.ProducerMetadata{
Err: errors.AssertionFailedf(
"batchFlowCoordinator should be nil for vectorizedFlow",
)})
recv.ProducerDone()
return
}
f.FlowBase.Resume(recv)
}

// Run is part of the Flow interface.
func (f *vectorizedFlow) Run(ctx context.Context) {
func (f *vectorizedFlow) Run(ctx context.Context, noWait bool) {
if f.batchFlowCoordinator == nil {
// If we didn't create a BatchFlowCoordinator, then we have a processor
// as the root, so we run this flow with the default implementation.
f.FlowBase.Run(ctx)
f.FlowBase.Run(ctx, noWait)
return
}

defer f.Wait()
if !noWait {
defer f.Wait()
}

if err := f.StartInternal(ctx, nil /* processors */, nil /* outputs */); err != nil {
f.GetRowSyncFlowConsumer().Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err})
Expand Down
72 changes: 39 additions & 33 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,42 +1734,48 @@ func (ex *connExecutor) execWithDistSQLEngine(
}
defer recv.Release()

evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
planCtx.skipDistSQLDiagramGeneration = true
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
var err error

if planner.hasFlowForPausablePortal() {
err = planner.resumeFlowForPausablePortal(recv)
} else {
evalCtx := planner.ExtendedEvalContext()
planCtx := ex.server.cfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, planner,
planner.txn, distribute)
planCtx.stmtType = recv.stmtType
// Skip the diagram generation since on this "main" query path we can get it
// via the statement bundle.
planCtx.skipDistSQLDiagramGeneration = true
if ex.server.cfg.TestingKnobs.TestingSaveFlows != nil {
planCtx.saveFlows = ex.server.cfg.TestingKnobs.TestingSaveFlows(planner.stmt.SQL)
} else if planner.instrumentation.ShouldSaveFlows() {
planCtx.saveFlows = planCtx.getDefaultSaveFlowsFunc(ctx, planner, planComponentTypeMainQuery)
}
planCtx.associateNodeWithComponents = planner.instrumentation.getAssociateNodeWithComponentsFn()
planCtx.collectExecStats = planner.instrumentation.ShouldCollectExecStats()

var evalCtxFactory func(usedConcurrently bool) *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 ||
len(planner.curPlan.cascades) != 0 ||
len(planner.curPlan.checkPlans) != 0 {
var serialEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &serialEvalCtx, planner)
evalCtxFactory = func(usedConcurrently bool) *extendedEvalContext {
// Reuse the same object if this factory is not used concurrently.
factoryEvalCtx := &serialEvalCtx
if usedConcurrently {
factoryEvalCtx = &extendedEvalContext{}
ex.initEvalCtx(ctx, factoryEvalCtx, planner)
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
ex.resetEvalCtx(factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
factoryEvalCtx.SessionID = planner.ExtendedEvalContext().SessionID
return factoryEvalCtx
}
err = ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
}
err := ex.server.cfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, planner, recv, evalCtxFactory)
return recv.stats, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql/vectorized_panic_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,5 @@ func TestNonVectorizedPanicDoesntHangServer(t *testing.T) {
}),
)

require.Panics(t, func() { flow.Run(ctx) })
require.Panics(t, func() { flow.Run(ctx, false /* noWait */) })
}
9 changes: 9 additions & 0 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,15 @@ func (p *PlanningCtx) IsLocal() bool {
return p.isLocal
}

// getPortalPauseInfo returns the portal pause info if the current planner is
// for a pausable portal. Otherwise, returns nil.
func (p *PlanningCtx) getPortalPauseInfo() *portalPauseInfo {
if p.planner != nil && p.planner.pausablePortal != nil && p.planner.pausablePortal.pauseInfo != nil {
return p.planner.pausablePortal.pauseInfo
}
return nil
}

// getDefaultSaveFlowsFunc returns the default function used to save physical
// plans and their diagrams. The returned function is **not** concurrency-safe.
func (p *PlanningCtx) getDefaultSaveFlowsFunc(
Expand Down
34 changes: 26 additions & 8 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,14 +833,31 @@ func (dsp *DistSQLPlanner) Run(
if planCtx.planner != nil {
statementSQL = planCtx.planner.stmt.StmtNoConstants
}
ctx, flow, err := dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
// Make sure that the local flow is always cleaned up if it was created.

var flow flowinfra.Flow
var err error
if i := planCtx.getPortalPauseInfo(); i != nil && i.flow != nil {
flow = i.flow
} else {
ctx, flow, err = dsp.setupFlows(
ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL,
)
if i != nil {
i.flow = flow
i.outputTypes = plan.GetResultTypes()
}
}

if flow != nil {
defer func() {
flow.Cleanup(ctx)
}()
// Make sure that the local flow is always cleaned up if it was created.
// If the flow is not for retained portal, we clean the flow up here.
// Otherwise, we delay the clean up via portalPauseInfo.flowCleanup until
// the portal is closed.
if planCtx.getPortalPauseInfo() == nil {
defer func() {
flow.Cleanup(ctx)
}()
}
}
if err != nil {
recv.SetError(err)
Expand All @@ -863,7 +880,8 @@ func (dsp *DistSQLPlanner) Run(
return
}

flow.Run(ctx)
noWait := planCtx.getPortalPauseInfo() != nil
flow.Run(ctx, noWait)
}

// DistSQLReceiver is an execinfra.RowReceiver and execinfra.BatchReceiver that
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ var overrideAlterPrimaryRegionInSuperRegion = settings.RegisterBoolSetting(
false,
).WithPublic()

var enableMultipleActivePortals = settings.RegisterBoolSetting(
var EnableMultipleActivePortals = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.pgwire.multiple_active_portals.enabled",
"if true, portals with read-only SELECT query without sub/post queries "+
Expand Down
20 changes: 19 additions & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,18 @@ type Processor interface {
// and the vectorized engines).
MustBeStreaming() bool

// Run is the main loop of the processor.
// Run is the main loop of the processor. It can be called only once
// throughout the processor's lifetime.
Run(context.Context, RowReceiver)

// Resume resumes the execution of the processor with the new receiver. It
// can be called many times but after Run() has already been called.
//
// Currently only used by the pausable portals.
//
// NB: this method doesn't take the context as parameter because the context
// was already captured on Run().
Resume(output RowReceiver)
}

// DoesNotUseTxn is an interface implemented by some processors to mark that
Expand Down Expand Up @@ -727,6 +737,14 @@ func (pb *ProcessorBaseNoHelper) Run(ctx context.Context, output RowReceiver) {
Run(pb.ctx, pb.self, output)
}

// Resume is part of the Processor interface.
func (pb *ProcessorBaseNoHelper) Resume(output RowReceiver) {
if output == nil {
panic("processor output is not provided for emitting rows")
}
Run(pb.ctx, pb.self, output)
}

// ProcStateOpts contains fields used by the ProcessorBase's family of functions
// that deal with draining and trailing metadata: the ProcessorBase implements
// generic useful functionality that needs to call back into the Processor.
Expand Down
45 changes: 42 additions & 3 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,23 @@ type Flow interface {
// It is assumed that rowSyncFlowConsumer is set, so all errors encountered
// when running this flow are sent to it.
//
// noWait is set true when the flow is bound to a pausable portal. With it set,
// the function returns without waiting the all goroutines to finish. For a
// pausable portal we will persist this flow and reuse it when re-executing
// the portal. The flow will be cleaned when the portal is closed, rather than
// when each portal execution finishes.
//
// The caller needs to call f.Cleanup().
Run(context.Context)
Run(ctx context.Context, noWait bool)

// Resume continues running the Flow after it has been paused with the new
// output receiver. The Flow is expected to have exactly one processor.
// It is called when resuming a paused portal.
// The lifecycle of a flow for a pausable portal is:
// - flow.Run(ctx, true /* noWait */) (only once)
// - flow.Resume() (for all re-executions of the portal)
// - flow.Cleanup() (only once)
Resume(recv execinfra.RowReceiver)

// Wait waits for all the goroutines for this flow to exit. If the context gets
// canceled before all goroutines exit, it calls f.cancel().
Expand Down Expand Up @@ -169,6 +184,9 @@ type Flow interface {
type FlowBase struct {
execinfra.FlowCtx

// resumeCtx is only captured for using inside of Flow.Resume() implementations.
resumeCtx context.Context

flowRegistry *FlowRegistry

// processors contains a subset of the processors in the flow - the ones that
Expand Down Expand Up @@ -488,9 +506,29 @@ func (f *FlowBase) Start(ctx context.Context) error {
return f.StartInternal(ctx, f.processors, f.outputs)
}

// Resume is part of the Flow interface.
func (f *FlowBase) Resume(recv execinfra.RowReceiver) {
if len(f.processors) != 1 || len(f.outputs) != 1 {
recv.Push(
nil, /* row */
&execinfrapb.ProducerMetadata{
Err: errors.AssertionFailedf(
"length of both the processor and the output must be 1",
)})
recv.ProducerDone()
return
}

f.outputs[0] = recv
log.VEventf(f.resumeCtx, 1, "resuming %T in the flow's goroutine", f.processors[0])
f.processors[0].Resume(recv)
}

// Run is part of the Flow interface.
func (f *FlowBase) Run(ctx context.Context) {
defer f.Wait()
func (f *FlowBase) Run(ctx context.Context, noWait bool) {
if !noWait {
defer f.Wait()
}

if len(f.processors) == 0 {
f.rowSyncFlowConsumer.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: errors.AssertionFailedf("no processors in flow")})
Expand All @@ -509,6 +547,7 @@ func (f *FlowBase) Run(ctx context.Context) {
f.rowSyncFlowConsumer.ProducerDone()
return
}
f.resumeCtx = ctx
log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc)
headProc.Run(ctx, headOutput)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/flowinfra/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func runLocalFlow(
if err != nil {
return nil, err
}
flow.Run(flowCtx)
flow.Run(flowCtx, false /* noWait */)
flow.Cleanup(flowCtx)

if !rowBuf.ProducerClosed() {
Expand Down Expand Up @@ -210,7 +210,7 @@ func runLocalFlowTenant(
if err != nil {
return nil, err
}
flow.Run(flowCtx)
flow.Run(flowCtx, false /* noWait */)
flow.Cleanup(flowCtx)

if !rowBuf.ProducerClosed() {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/importer/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ func (sp *csvWriter) Run(ctx context.Context, output execinfra.RowReceiver) {
ctx, output, err, func(context.Context, execinfra.RowReceiver) {} /* pushTrailingMeta */, sp.input)
}

// Resume is part of the execinfra.Processor interface.
func (sp *csvWriter) Resume(output execinfra.RowReceiver) {
panic("not implemented")
}

func init() {
rowexec.NewCSVWriterProcessor = newCSVWriterProcessor
}
Loading