From a3c8d59000f51edd15ae25c5a797f5a6d0c0ea9c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 10 Feb 2021 07:30:34 -0800 Subject: [PATCH] sql: fix a race in the internal executor The recent change to make the internal executor streaming introduced a race between `initConnEx` returning (and, thus, assigning a stmtBuf value) and the connExecutor's goroutine being spawn up. As a result, the assumption of the callbacks that the stmtBuf is non-nil might not be satisfied. This commit fixes the issue by creating the objects in question before `initConnEx` is called (which is also before the connExecutor goroutine starts). Release note: None --- pkg/sql/internal.go | 37 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index ca8adf2c3e0d..a219e46993ba 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -125,8 +125,8 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) } // initConnEx creates a connExecutor and runs it on a separate goroutine. It -// returns a StmtBuf into which commands can be pushed and a WaitGroup that will -// be signaled when connEx.run() returns. +// takes in a StmtBuf into which commands can be pushed and a WaitGroup that +// will be signaled when connEx.run() returns. // // If txn is not nil, the statement will be executed in the respective txn. // @@ -139,9 +139,11 @@ func (ie *InternalExecutor) initConnEx( txn *kv.Txn, ch chan ieIteratorResult, sd *sessiondata.SessionData, + stmtBuf *StmtBuf, + wg *sync.WaitGroup, syncCallback func([]resWithPos), errCallback func(error), -) (*StmtBuf, *sync.WaitGroup) { +) { clientComm := &internalClientComm{ ch: ch, // init lastDelivered below the position of the first result (0). @@ -166,7 +168,6 @@ func (ie *InternalExecutor) initConnEx( } appStats := ie.s.sqlStats.getStatsForApplication(appStatsBucketName) - stmtBuf := NewStmtBuf() var ex *connExecutor if txn == nil { ex = ie.s.newConnExecutor( @@ -196,7 +197,6 @@ func (ie *InternalExecutor) initConnEx( } ex.executorType = executorTypeInternal - var wg sync.WaitGroup wg.Add(1) go func() { if err := ex.run(ctx, ie.mon, mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { @@ -211,7 +211,6 @@ func (ie *InternalExecutor) initConnEx( ex.close(ctx, closeMode) wg.Done() }() - return stmtBuf, &wg } type ieIteratorResult struct { @@ -631,8 +630,8 @@ func (ie *InternalExecutor) execInternal( // exits before the span is finished. ctx, sp := tracing.EnsureChildSpan(ctx, ie.s.cfg.AmbientCtx.Tracer, opName) - var stmtBuf *StmtBuf - var wg *sync.WaitGroup + stmtBuf := NewStmtBuf() + var wg sync.WaitGroup defer func() { // We wrap errors with the opName, but not if they're retriable - in that @@ -645,20 +644,8 @@ func (ie *InternalExecutor) execInternal( if !errIsRetriable(retErr) { retErr = errors.Wrapf(retErr, "%s", opName) } - if stmtBuf != nil { - // If stmtBuf is non-nil, then the connExecutor goroutine has - // been spawn up - we gotta wait for it to exit. - // - // Note that at the moment of writing when retErr is non-nil, - // the stmtBuf is necessarily nil (the only errors emitted after - // the connExecutor is initialized are the errors on pushing - // into the stmtBuf, and those could occur only if the stmtBuf - // is closed which would indicate problems with - // synchronization). In any case, we want to be safe and handle - // such a scenario accordingly. - stmtBuf.Close() - wg.Wait() - } + stmtBuf.Close() + wg.Wait() sp.Finish() } else { // r must be non-nil here. @@ -709,13 +696,15 @@ func (ie *InternalExecutor) execInternal( } ch <- ieIteratorResult{err: errors.AssertionFailedf("missing result for pos: %d and no previous error", resPos)} } + // errCallback is called if an error is returned from the connExecutor's + // run() loop. errCallback := func(err error) { // The connExecutor exited its run() loop, so the stmtBuf must have been // closed. Still, since Close() is idempotent, we'll call it here too. stmtBuf.Close() ch <- ieIteratorResult{err: err} } - stmtBuf, wg = ie.initConnEx(ctx, txn, ch, sd, syncCallback, errCallback) + ie.initConnEx(ctx, txn, ch, sd, stmtBuf, &wg, syncCallback, errCallback) typeHints := make(tree.PlaceholderTypes, len(datums)) for i, d := range datums { @@ -768,7 +757,7 @@ func (ie *InternalExecutor) execInternal( ch: ch, resultCols: resultColumns, stmtBuf: stmtBuf, - wg: wg, + wg: &wg, }, nil }