Skip to content

Commit

Permalink
sql: fix recent racy regression
Browse files Browse the repository at this point in the history
This commit fixes an issue that was introduced in 0c1095e
where we could set an error on the DistSQLReceiver from the concurrent
goroutine while the main goroutine would proceed on pushing. If the
next object it pushes is not a metadata (i.e. either a row or a batch),
this would result in a crash.  The issue is that we mistakenly assumed
that updating the status of the receiver only on metadata objects would
be sufficient, but it's not since there is a race between the context
cancellation propagation across the local flow and the row / batch
being pushed to the client.

This commit fixes the issue by introducing an atomic boolean that
indicates whether the receiver's status needs to be updated. The
concurrent goroutine now sets the error as well as this atomic boolean,
and the main goroutine now checks the boolean on each push and updates
the status if necessary. With this setup we avoid acquiring the mutex
on each push call and only do an atomic read (which is supposed to be
faster).

Release note: None
  • Loading branch information
yuzefovich committed Dec 12, 2022
1 parent 4b3f582 commit 3b3bf2f
Showing 1 changed file with 33 additions and 18 deletions.
51 changes: 33 additions & 18 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,9 +587,9 @@ func (dsp *DistSQLPlanner) setupFlows(
//
// In order to not protect DistSQLReceiver.status with a mutex,
// we do not update the status here and, instead, rely on the
// DistSQLReceiver detecting the error the next time a meta is
// pushed into it.
recv.setErrorWithoutStatusUpdate(res.err)
// DistSQLReceiver detecting the error the next time an object
// is pushed into it.
recv.setErrorWithoutStatusUpdate(res.err, true /* willDeferStatusUpdate */)
// Now explicitly cancel the local flow.
flow.Cancel()
}
Expand Down Expand Up @@ -834,6 +834,11 @@ type DistSQLReceiver struct {
row rowResultWriter
batch batchResultWriter
}
// updateStatus, if true, indicates that a concurrent goroutine has set an
// error on the rowResultWriter without updating status, so the main
// goroutine needs to update the status.
updateStatus atomic.Bool
status execinfra.ConsumerStatus

stmtType tree.StatementReturnType

Expand All @@ -859,7 +864,6 @@ type DistSQLReceiver struct {
commErr error

row tree.Datums
status execinfra.ConsumerStatus
alloc tree.DatumAlloc
closed bool

Expand Down Expand Up @@ -1174,12 +1178,12 @@ func (r *DistSQLReceiver) getError() error {
}

// setErrorWithoutStatusUpdate sets the error in the rowResultWriter but does
// **not** update the status of the DistSQLReceiver. The status will be updated
// the next time a meta is pushed into the receiver (by the goroutine that is
// pushing).
// **not** update the status of the DistSQLReceiver. willDeferStatusUpdate
// indicates whether the main goroutine should update the status the next time
// it pushes something into the DistSQLReceiver.
//
// NOTE: consider using SetError() instead.
func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error) {
func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error, willDeferStatusUpdate bool) {
r.resultWriterMu.Lock()
defer r.resultWriterMu.Unlock()
// Check if the error we just received should take precedence over a
Expand All @@ -1204,6 +1208,7 @@ func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error) {
}
}
r.resultWriterMu.row.SetError(err)
r.updateStatus.Store(willDeferStatusUpdate)
}
}

Expand All @@ -1227,20 +1232,28 @@ func (r *DistSQLReceiver) updateStatusAfterError(err error) {
//
// The status of DistSQLReceiver is updated accordingly.
func (r *DistSQLReceiver) SetError(err error) {
r.setErrorWithoutStatusUpdate(err)
r.setErrorWithoutStatusUpdate(err, false /* willDeferStatusUpdate */)
r.updateStatusAfterError(err)
}

// checkConcurrentError sets the status if an error has been set by another
// goroutine that did not also update the status.
func (r *DistSQLReceiver) checkConcurrentError() {
if r.status != execinfra.NeedMoreRows || !r.updateStatus.Load() {
// If the status already is not NeedMoreRows, then it doesn't matter if
// there was a concurrent error set.
return
}
previousErr := r.getError()
if previousErr == nil {
previousErr = errors.AssertionFailedf("unexpectedly updateStatus is set but there is no error")
}
r.updateStatusAfterError(previousErr)
}

// pushMeta takes in non-empty metadata object and pushes it to the result
// writer. Possibly updated status is returned.
func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus {
// Check whether an error has been set by another goroutine without updating
// the status.
if r.status == execinfra.NeedMoreRows {
if previousErr := r.getError(); previousErr != nil {
r.updateStatusAfterError(previousErr)
}
}
if metaWriter, ok := r.resultWriterMu.row.(MetadataResultWriter); ok {
metaWriter.AddMeta(r.ctx, meta)
}
Expand Down Expand Up @@ -1346,13 +1359,14 @@ func (r *DistSQLReceiver) handleCommErr(commErr error) {
func (r *DistSQLReceiver) Push(
row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata,
) execinfra.ConsumerStatus {
r.checkConcurrentError()
if r.testingKnobs.pushCallback != nil {
r.testingKnobs.pushCallback(row, meta)
}
if meta != nil {
return r.pushMeta(meta)
}
if r.ctx.Err() != nil && r.getError() == nil {
if r.ctx.Err() != nil && r.status != execinfra.ConsumerClosed {
r.SetError(r.ctx.Err())
}
if r.status != execinfra.NeedMoreRows {
Expand Down Expand Up @@ -1425,10 +1439,11 @@ func (r *DistSQLReceiver) Push(
func (r *DistSQLReceiver) PushBatch(
batch coldata.Batch, meta *execinfrapb.ProducerMetadata,
) execinfra.ConsumerStatus {
r.checkConcurrentError()
if meta != nil {
return r.pushMeta(meta)
}
if r.ctx.Err() != nil && r.getError() == nil {
if r.ctx.Err() != nil && r.status != execinfra.ConsumerClosed {
r.SetError(r.ctx.Err())
}
if r.status != execinfra.NeedMoreRows {
Expand Down

0 comments on commit 3b3bf2f

Please sign in to comment.