Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
28212: distsql: keep track of whether a flow has started goroutines r=asubiotto a=asubiotto

This enables callers of `Wait` to avoid the overhead of goroutine
creation + select when unnecessary. CPU profiling showed around 160ms of
CPU time was spent in `flow.Wait` during a run of `BenchmarkFlowSetup`.

Release note: None

cc @RaduBerinde 

Co-authored-by: Alfonso Subiotto Marqués <[email protected]>
  • Loading branch information
craig[bot] and asubiotto committed Aug 10, 2018
2 parents 9394dc6 + a003a49 commit 27cecbd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ type Flow struct {

localProcessors []LocalProcessor

// startedGoroutines specifies whether this flow started any goroutines. This
// is used in Wait() to avoid the overhead of waiting for non-existent
// goroutines.
startedGoroutines bool

localStreams map[StreamID]RowReceiver

// inboundStreams are streams that receive data from other hosts; this map
Expand Down Expand Up @@ -535,6 +540,7 @@ func (f *Flow) startInternal(ctx context.Context, doneFn func()) error {
f.waitGroup.Add(1)
go f.processors[i].Run(ctx, &f.waitGroup)
}
f.startedGoroutines = len(f.startables) > 0 || len(f.processors) > 1 || !f.isLocal()
return nil
}

Expand Down Expand Up @@ -562,6 +568,7 @@ func (f *Flow) StartAsync(ctx context.Context, doneFn func()) error {
if len(f.processors) > 0 {
f.waitGroup.Add(1)
go f.processors[len(f.processors)-1].Run(ctx, &f.waitGroup)
f.startedGoroutines = true
}
return nil
}
Expand All @@ -588,6 +595,9 @@ func (f *Flow) StartSync(ctx context.Context, doneFn func()) error {
// Wait waits for all the goroutines for this flow to exit. If the context gets
// canceled before all goroutines exit, it calls f.cancel().
func (f *Flow) Wait() {
if !f.startedGoroutines {
return
}
waitChan := make(chan struct{})

go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (ds *ServerImpl) RunSyncFlow(stream DistSQL_RunSyncFlowServer) error {
if err := ds.Stopper.RunTask(ctx, "distsqlrun.ServerImpl: sync flow", func(ctx context.Context) {
ctx, ctxCancel := contextutil.WithCancel(ctx)
defer ctxCancel()
mbox.start(ctx, &f.waitGroup, ctxCancel)
f.startables = append(f.startables, mbox)
ds.Metrics.FlowStart()
if err := f.StartSync(ctx, func() {}); err != nil {
log.Fatalf(ctx, "unexpected error from syncFlow.Start(): %s "+
Expand Down

0 comments on commit 27cecbd

Please sign in to comment.