Skip to content

Commit

Permalink
fix parallel run scope never do context rebuild. (#17950)
Browse files Browse the repository at this point in the history
修复了一个parallel run构造出来的pipeline上下文结构错误的问题。

Approved by: @XuPeng-SH, @badboynt1, @ouyuanning
  • Loading branch information
m-schen authored Aug 9, 2024
1 parent c1ca8b6 commit 315fdbe
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 31 deletions.
20 changes: 20 additions & 0 deletions pkg/sql/compile/compile2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/util/trace"
"github.com/matrixorigin/matrixone/pkg/util/trace/impl/motrace/statistic"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
gotrace "runtime/trace"
"time"
Expand Down Expand Up @@ -319,6 +320,12 @@ func (c *Compile) InitPipelineContextToExecuteQuery() {
}
}

// CleanPipelineChannelToNextQuery cleans the channel between each pipeline tree for recall / rerun.
// todo: this has not implement now.
//func (c *Compile) CleanPipelineChannelToNextQuery() {
// // do nothing now.
//}

// buildContextFromParentCtx build the context for the pipeline tree.
// the input parameter is the whole tree's parent context.
func (s *Scope) buildContextFromParentCtx(parentCtx context.Context) {
Expand All @@ -329,3 +336,16 @@ func (s *Scope) buildContextFromParentCtx(parentCtx context.Context) {
prePipeline.buildContextFromParentCtx(receiverCtx)
}
}

// setContextForParallelScope set the context for the parallel scope.
// the difference between this function and the buildContextFromParentCtx is we won't rebuild the context for top scope.
//
// parallel scope is a special scope generated by the scope.ParallelRun.
func setContextForParallelScope(parallelScope *Scope, originalContext context.Context, originalCancel context.CancelFunc) {
process.ReplacePipelineCtx(parallelScope.Proc, originalContext, originalCancel)

// build context for data entry.
for _, prePipeline := range parallelScope.PreScopes {
prePipeline.buildContextFromParentCtx(parallelScope.Proc.Ctx)
}
}
12 changes: 8 additions & 4 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *Scope) Run(c *Compile) (err error) {
if s.DataSource.TableDef != nil {
id = s.DataSource.TableDef.TblId
}
p = pipeline.New(id, s.DataSource.Attributes, s.RootOp, s.Reg)
p = pipeline.New(id, s.DataSource.Attributes, s.RootOp)
if s.DataSource.isConst {
_, err = p.ConstRun(s.DataSource.Bat, s.Proc)
} else {
Expand Down Expand Up @@ -298,7 +298,7 @@ func (s *Scope) MergeRun(c *Compile) error {
}
}()

p := pipeline.NewMerge(s.RootOp, s.Reg)
p := pipeline.NewMerge(s.RootOp)
if _, err := p.MergeRun(s.Proc); err != nil {
select {
case <-s.Proc.Ctx.Done():
Expand Down Expand Up @@ -356,7 +356,7 @@ func (s *Scope) RemoteRun(c *Compile) error {
zap.String("local-address", c.addr),
zap.String("remote-address", s.NodeInfo.Addr))

p := pipeline.New(0, nil, s.RootOp, s.Reg)
p := pipeline.New(0, nil, s.RootOp)
sender, err := s.remoteRun(c)

runErr := err
Expand Down Expand Up @@ -393,7 +393,7 @@ func (s *Scope) ParallelRun(c *Compile) (err error) {
// if codes run here, it means some error happens during build the parallel scope.
// we should do clean work for source-scope to avoid receiver hung.
if parallelScope == nil {
pipeline.NewMerge(s.RootOp, s.Reg).Cleanup(s.Proc, true, c.isPrepare, err)
pipeline.NewMerge(s.RootOp).Cleanup(s.Proc, true, c.isPrepare, err)
}
}()

Expand Down Expand Up @@ -421,6 +421,10 @@ func (s *Scope) ParallelRun(c *Compile) (err error) {
return err
}

if parallelScope != s {
setContextForParallelScope(parallelScope, s.Proc.Ctx, s.Proc.Cancel)
}

if parallelScope.Magic == Normal {
return parallelScope.Run(c)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/compile/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ type Scope struct {
// Proc contains the execution context.
Proc *process.Process

Reg *process.WaitRegister

RemoteReceivRegInfos []RemoteReceivRegInfo

BuildIdx int
Expand Down
20 changes: 2 additions & 18 deletions pkg/vm/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

func New(tableID uint64, attrs []string, op vm.Operator, reg *process.WaitRegister) *Pipeline {
func New(tableID uint64, attrs []string, op vm.Operator) *Pipeline {
return &Pipeline{
reg: reg,
rootOp: op,
attrs: attrs,
tableID: tableID,
}
}

func NewMerge(op vm.Operator, reg *process.WaitRegister) *Pipeline {
func NewMerge(op vm.Operator) *Pipeline {
return &Pipeline{
reg: reg,
rootOp: op,
}
}
Expand All @@ -52,7 +50,6 @@ func (p *Pipeline) String() string {
}

func (p *Pipeline) Run(r engine.Reader, topValueMsgTag int32, proc *process.Process) (end bool, err error) {
p.waitRegister()

if tableScanOperator, ok := vm.GetLeafOp(p.rootOp).(*table_scan.TableScan); ok {
tableScanOperator.Reader = r
Expand All @@ -65,7 +62,6 @@ func (p *Pipeline) Run(r engine.Reader, topValueMsgTag int32, proc *process.Proc
}

func (p *Pipeline) ConstRun(bat *batch.Batch, proc *process.Process) (end bool, err error) {
p.waitRegister()

if valueScanOperator, ok := vm.GetLeafOp(p.rootOp).(*value_scan.ValueScan); ok {
pipelineInputBatches := []*batch.Batch{bat}
Expand All @@ -79,21 +75,9 @@ func (p *Pipeline) ConstRun(bat *batch.Batch, proc *process.Process) (end bool,
}

func (p *Pipeline) MergeRun(proc *process.Process) (end bool, err error) {
p.waitRegister()

return p.run(proc)
}

func (p *Pipeline) waitRegister() {
if p.reg == nil {
return
}
select {
case <-p.reg.Ctx.Done():
case <-p.reg.Ch:
}
}

func (p *Pipeline) run(proc *process.Process) (end bool, err error) {
if err = vm.Prepare(p.rootOp, proc); err != nil {
return false, err
Expand Down
1 change: 0 additions & 1 deletion pkg/vm/pipeline/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type Pipeline struct {
// orders to be executed
// instructions vm.Instructions
rootOp vm.Operator
reg *process.WaitRegister
}

// Cleanup do memory release work for whole pipeline.
Expand Down
25 changes: 21 additions & 4 deletions pkg/vm/process/process2.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func NewTopProcess(
return proc
}

// NewNoContextChildProc make a new child process without context field.
// This is used for the compile process, which doesn't need to pass the context.
// NewNoContextChildProc make a new child process without a context field.
// This is used for the compile-process, which doesn't need to pass the context.
func (proc *Process) NewNoContextChildProc(dataEntryCount int) *Process {
child := &Process{
Base: proc.Base,
Expand All @@ -118,6 +118,7 @@ func (proc *Process) NewNoContextChildProc(dataEntryCount int) *Process {

// NewContextChildProc make a new child and init its context field.
// This is used for parallel execution, which will make a new child process to run a pipeline directly.
// todo: I will remove this method next day, it's a waste to create a new context.
func (proc *Process) NewContextChildProc(dataEntryCount int) *Process {
child := proc.NewNoContextChildProc(dataEntryCount)
child.BuildPipelineContext(proc.Ctx)
Expand All @@ -132,10 +133,11 @@ func (proc *Process) BuildPipelineContext(parentContext context.Context) context
proc.Ctx, proc.Cancel = context.WithCancel(parentContext)

// update the context held by this process's data producers.
mp := proc.Mp()
for _, sender := range proc.Reg.MergeReceivers {
sender.Ctx = proc.Ctx
sender.CleanChannel(mp)

// do not clean the channel here, because we cannot ensure that sender was not in progress.
//sender.CleanChannel(mp)
}
return proc.Ctx
}
Expand Down Expand Up @@ -177,6 +179,21 @@ func GetQueryCtxFromProc(proc *Process) (context.Context, context.CancelFunc) {
return proc.Base.sqlContext.queryContext, proc.Base.sqlContext.queryCancel
}

// ReplacePipelineCtx replaces the pipeline context and cancel function for the process.
// It's a very dangerous operation, should be used with caution.
// And we only use it for the newly built pipeline by the pipeline's ParallelRun method.
func ReplacePipelineCtx(proc *Process, ctx context.Context, cancel context.CancelFunc) {
proc.Ctx = ctx
proc.Cancel = cancel

for _, sender := range proc.Reg.MergeReceivers {
sender.Ctx = proc.Ctx

// do not clean the channel here, because we cannot ensure that sender was not in progress.
//sender.CleanChannel(mp)
}
}

// GetQueryContextError return error once top context or query context with error.
func (proc *Process) GetQueryContextError() error {
base := proc.Base.GetContextBase()
Expand Down
23 changes: 21 additions & 2 deletions pkg/vm/process/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,29 @@ func NewRegMsg(bat *batch.Batch) *RegisterMessage {

// WaitRegister channel
type WaitRegister struct {
// Ctx, context for data receiver.
// Ctx, context of data receiver's pipeline.
//
// todo:
// This must cause a race here, because the context was shared by multiple pipelines.
//
// Assume we have two pipelines,
// pipeline1 and pipeline2, pipeline1 will dispatch data to pipeline2.
// so they share the same WaitRegister.
// and all of the receiver pipeline2 is parallel type.
//
// see the function `setContextForParallelScope` in `pkg/sql/compile/compile2.go`,
// we will rebuild pipeline context sometimes for parallel-type pipeline.
//
// If pipeline1 run first, it will listen to the context of pipeline2 from WaitRegister,
// and then pipeline2 run, it will rebuild the context, and the context of pipeline2 will be changed.
// it's a race but maybe not a problem, because the receiver never receive data before the pipeline2 run.
//
// it's a better way to use a self context but not the pipeline context here.
// and the receiver shut down the context when it's done.
Ctx context.Context
// Ch, data receiver channel, receiver will wait for data from this channel.
// Ch, data receiver's channel, receiver will wait for data from this channel.
Ch chan *RegisterMessage

// how many nil batch this channel can receive, default 0 means every nil batch close channel
NilBatchCnt int
}
Expand Down

0 comments on commit 315fdbe

Please sign in to comment.