Skip to content

Commit

Permalink
Merge #60425
Browse files Browse the repository at this point in the history
60425: sql: fix a race in the internal executor r=yuzefovich a=yuzefovich

The recent change to make the internal executor streaming introduced
a race between `initConnEx` returning (and, thus, assigning a stmtBuf
value) and the connExecutor's goroutine being spawn up. As a result, the
assumption of the callbacks that the stmtBuf is non-nil might not be
satisfied. This commit fixes the issue by creating the objects in
question before `initConnEx` is called (which is also before the
connExecutor goroutine starts).

Fixes: #60423.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 10, 2021
2 parents 1577cac + a3c8d59 commit ffcd641
Showing 1 changed file with 13 additions and 24 deletions.
37 changes: 13 additions & 24 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
}

// initConnEx creates a connExecutor and runs it on a separate goroutine. It
// returns a StmtBuf into which commands can be pushed and a WaitGroup that will
// be signaled when connEx.run() returns.
// takes in a StmtBuf into which commands can be pushed and a WaitGroup that
// will be signaled when connEx.run() returns.
//
// If txn is not nil, the statement will be executed in the respective txn.
//
Expand All @@ -139,9 +139,11 @@ func (ie *InternalExecutor) initConnEx(
txn *kv.Txn,
ch chan ieIteratorResult,
sd *sessiondata.SessionData,
stmtBuf *StmtBuf,
wg *sync.WaitGroup,
syncCallback func([]resWithPos),
errCallback func(error),
) (*StmtBuf, *sync.WaitGroup) {
) {
clientComm := &internalClientComm{
ch: ch,
// init lastDelivered below the position of the first result (0).
Expand All @@ -166,7 +168,6 @@ func (ie *InternalExecutor) initConnEx(
}
appStats := ie.s.sqlStats.getStatsForApplication(appStatsBucketName)

stmtBuf := NewStmtBuf()
var ex *connExecutor
if txn == nil {
ex = ie.s.newConnExecutor(
Expand Down Expand Up @@ -196,7 +197,6 @@ func (ie *InternalExecutor) initConnEx(
}
ex.executorType = executorTypeInternal

var wg sync.WaitGroup
wg.Add(1)
go func() {
if err := ex.run(ctx, ie.mon, mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil {
Expand All @@ -211,7 +211,6 @@ func (ie *InternalExecutor) initConnEx(
ex.close(ctx, closeMode)
wg.Done()
}()
return stmtBuf, &wg
}

type ieIteratorResult struct {
Expand Down Expand Up @@ -631,8 +630,8 @@ func (ie *InternalExecutor) execInternal(
// exits before the span is finished.
ctx, sp := tracing.EnsureChildSpan(ctx, ie.s.cfg.AmbientCtx.Tracer, opName)

var stmtBuf *StmtBuf
var wg *sync.WaitGroup
stmtBuf := NewStmtBuf()
var wg sync.WaitGroup

defer func() {
// We wrap errors with the opName, but not if they're retriable - in that
Expand All @@ -645,20 +644,8 @@ func (ie *InternalExecutor) execInternal(
if !errIsRetriable(retErr) {
retErr = errors.Wrapf(retErr, "%s", opName)
}
if stmtBuf != nil {
// If stmtBuf is non-nil, then the connExecutor goroutine has
// been spawn up - we gotta wait for it to exit.
//
// Note that at the moment of writing when retErr is non-nil,
// the stmtBuf is necessarily nil (the only errors emitted after
// the connExecutor is initialized are the errors on pushing
// into the stmtBuf, and those could occur only if the stmtBuf
// is closed which would indicate problems with
// synchronization). In any case, we want to be safe and handle
// such a scenario accordingly.
stmtBuf.Close()
wg.Wait()
}
stmtBuf.Close()
wg.Wait()
sp.Finish()
} else {
// r must be non-nil here.
Expand Down Expand Up @@ -709,13 +696,15 @@ func (ie *InternalExecutor) execInternal(
}
ch <- ieIteratorResult{err: errors.AssertionFailedf("missing result for pos: %d and no previous error", resPos)}
}
// errCallback is called if an error is returned from the connExecutor's
// run() loop.
errCallback := func(err error) {
// The connExecutor exited its run() loop, so the stmtBuf must have been
// closed. Still, since Close() is idempotent, we'll call it here too.
stmtBuf.Close()
ch <- ieIteratorResult{err: err}
}
stmtBuf, wg = ie.initConnEx(ctx, txn, ch, sd, syncCallback, errCallback)
ie.initConnEx(ctx, txn, ch, sd, stmtBuf, &wg, syncCallback, errCallback)

typeHints := make(tree.PlaceholderTypes, len(datums))
for i, d := range datums {
Expand Down Expand Up @@ -768,7 +757,7 @@ func (ie *InternalExecutor) execInternal(
ch: ch,
resultCols: resultColumns,
stmtBuf: stmtBuf,
wg: wg,
wg: &wg,
}, nil
}

Expand Down

0 comments on commit ffcd641

Please sign in to comment.