diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 4710a40204c8..0e01d29d0473 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -587,9 +587,9 @@ func (dsp *DistSQLPlanner) setupFlows( // // In order to not protect DistSQLReceiver.status with a mutex, // we do not update the status here and, instead, rely on the - // DistSQLReceiver detecting the error the next time a meta is - // pushed into it. - recv.setErrorWithoutStatusUpdate(res.err) + // DistSQLReceiver detecting the error the next time an object + // is pushed into it. + recv.setErrorWithoutStatusUpdate(res.err, true /* willDeferStatusUpdate */) // Now explicitly cancel the local flow. flow.Cancel() } @@ -834,6 +834,11 @@ type DistSQLReceiver struct { row rowResultWriter batch batchResultWriter } + // updateStatus, if true, indicates that a concurrent goroutine has set an + // error on the rowResultWriter without updating status, so the main + // goroutine needs to update the status. + updateStatus atomic.Bool + status execinfra.ConsumerStatus stmtType tree.StatementReturnType @@ -859,7 +864,6 @@ type DistSQLReceiver struct { commErr error row tree.Datums - status execinfra.ConsumerStatus alloc tree.DatumAlloc closed bool @@ -1174,12 +1178,12 @@ func (r *DistSQLReceiver) getError() error { } // setErrorWithoutStatusUpdate sets the error in the rowResultWriter but does -// **not** update the status of the DistSQLReceiver. The status will be updated -// the next time a meta is pushed into the receiver (by the goroutine that is -// pushing). +// **not** update the status of the DistSQLReceiver. willDeferStatusUpdate +// indicates whether the main goroutine should update the status the next time +// it pushes something into the DistSQLReceiver. // // NOTE: consider using SetError() instead. -func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error) { +func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error, willDeferStatusUpdate bool) { r.resultWriterMu.Lock() defer r.resultWriterMu.Unlock() // Check if the error we just received should take precedence over a @@ -1204,6 +1208,7 @@ func (r *DistSQLReceiver) setErrorWithoutStatusUpdate(err error) { } } r.resultWriterMu.row.SetError(err) + r.updateStatus.Store(willDeferStatusUpdate) } } @@ -1227,20 +1232,28 @@ func (r *DistSQLReceiver) updateStatusAfterError(err error) { // // The status of DistSQLReceiver is updated accordingly. func (r *DistSQLReceiver) SetError(err error) { - r.setErrorWithoutStatusUpdate(err) + r.setErrorWithoutStatusUpdate(err, false /* willDeferStatusUpdate */) r.updateStatusAfterError(err) } +// checkConcurrentError checks whether an error has been set by another +// goroutine without updating the status. +func (r *DistSQLReceiver) checkConcurrentError() { + if r.status != execinfra.NeedMoreRows || !r.updateStatus.Load() { + // If the status already is not NeedMoreRows, then it doesn't matter if + // there was a concurrent error set. + return + } + previousErr := r.getError() + if previousErr == nil { + previousErr = errors.AssertionFailedf("unexpectedly updateStatus is set but there is no error") + } + r.updateStatusAfterError(previousErr) +} + // pushMeta takes in non-empty metadata object and pushes it to the result // writer. Possibly updated status is returned. func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra.ConsumerStatus { - // Check whether an error has been set by another goroutine without updating - // the status. - if r.status == execinfra.NeedMoreRows { - if previousErr := r.getError(); previousErr != nil { - r.updateStatusAfterError(previousErr) - } - } if metaWriter, ok := r.resultWriterMu.row.(MetadataResultWriter); ok { metaWriter.AddMeta(r.ctx, meta) } @@ -1346,13 +1359,14 @@ func (r *DistSQLReceiver) handleCommErr(commErr error) { func (r *DistSQLReceiver) Push( row rowenc.EncDatumRow, meta *execinfrapb.ProducerMetadata, ) execinfra.ConsumerStatus { + r.checkConcurrentError() if r.testingKnobs.pushCallback != nil { r.testingKnobs.pushCallback(row, meta) } if meta != nil { return r.pushMeta(meta) } - if r.ctx.Err() != nil && r.getError() == nil { + if r.ctx.Err() != nil && r.status != execinfra.ConsumerClosed { r.SetError(r.ctx.Err()) } if r.status != execinfra.NeedMoreRows { @@ -1425,10 +1439,11 @@ func (r *DistSQLReceiver) Push( func (r *DistSQLReceiver) PushBatch( batch coldata.Batch, meta *execinfrapb.ProducerMetadata, ) execinfra.ConsumerStatus { + r.checkConcurrentError() if meta != nil { return r.pushMeta(meta) } - if r.ctx.Err() != nil && r.getError() == nil { + if r.ctx.Err() != nil && r.status != execinfra.ConsumerClosed { r.SetError(r.ctx.Err()) } if r.status != execinfra.NeedMoreRows {