Skip to content

Commit

Permalink
sql: clean up ieResultChannel concepts and fix race condition
Browse files Browse the repository at this point in the history
The async and sync implementations were too close to justify two structs.
Also, the async behavior of not stopping the writer in case the reader
called close wasn't desireable. This commit unifies the implementation.
It also ensures that we propagate context errors in all cases triggered
by the closure of the done channel. It also makes closing the channel
idempotent.

Fixes #62948

Release note: None
  • Loading branch information
ajwerner authored and yuzefovich committed Apr 2, 2021
1 parent 7bc94ea commit b606d7d
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 120 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,10 @@ func (r *DistSQLReceiver) Push(
}
r.tracing.TraceExecRowsResult(r.ctx, r.row)
if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil {
if errors.Is(commErr, ErrLimitedResultClosed) {
// ErrLimitedResultClosed is not a real error, it is a signal to
// stop distsql and return success to the client (that's why we
// don't set the error on the resultWriter).
if errors.Is(commErr, ErrLimitedResultClosed) || errors.Is(commErr, errIEResultChannelClosed) {
// ErrLimitedResultClosed and errIEResultChannelClosed are not real
// errors, it is a signal to stop distsql and return success to the
// client (that's why we don't set the error on the resultWriter).
r.status = execinfra.DrainRequested
} else {
// Set the error on the resultWriter too, for the convenience of some of the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{
func (ie *InternalExecutor) execInternal(
ctx context.Context,
opName string,
rw ieResultChannel,
rw *ieResultChannel,
txn *kv.Txn,
sessionDataOverride sessiondata.InternalExecutorOverride,
stmt string,
Expand Down
208 changes: 93 additions & 115 deletions pkg/sql/internal_result_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ import (
"github.com/cockroachdb/errors"
)

// ieResultChannel is used to coordinate passing results from an
// internalExecutor to its corresponding iterator.
type ieResultChannel interface {
ieResultReader
ieResultWriter
}

// ieResultReader is used to read internalExecutor results.
// It is managed by the rowsIterator.
type ieResultReader interface {
Expand All @@ -34,15 +27,13 @@ type ieResultReader interface {
// currently blocked and waits for the initial result to be written.
firstResult(ctx context.Context) (_ ieIteratorResult, done bool, err error)

// nextResult returns the nextResult. Done will always be true if err
// nextResult returns the next result. Done will always be true if err
// is non-nil. Err will be non-nil if either close has been called or
// the passed context is finished.
nextResult(ctx context.Context) (_ ieIteratorResult, done bool, err error)

// close ensures that the either writer has finished writing. In the case
// of an asynchronous channel, close will drain the writer's channel. In the
// case of the synchronous channel, it will ensure that the writer receives
// an error when it wakes.
// close ensures that either writer has finished writing. The writer will
// receive a signal to drain, and close will drain the writer's channel.
close() error
}

Expand All @@ -67,174 +58,161 @@ var asyncIEResultChannelBufferSize = util.ConstantWithMetamorphicTestRange(

// newAsyncIEResultChannel returns an ieResultChannel which does not attempt to
// synchronize the writer with the reader.
func newAsyncIEResultChannel() ieResultChannel {
return &asyncIEResultChannel{
func newAsyncIEResultChannel() *ieResultChannel {
return &ieResultChannel{
dataCh: make(chan ieIteratorResult, asyncIEResultChannelBufferSize),
doneCh: make(chan struct{}),
}
}

type asyncIEResultChannel struct {
dataCh chan ieIteratorResult
}

var _ ieResultChannel = &asyncIEResultChannel{}

func (c *asyncIEResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
case res, ok := <-c.dataCh:
if !ok {
return ieIteratorResult{}, true, nil
}
return res, false, nil
}
}

func (c *asyncIEResultChannel) nextResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
return c.firstResult(ctx)
}

func (c *asyncIEResultChannel) close() error {
var firstErr error
for {
res, done, err := c.nextResult(context.TODO())
if firstErr == nil {
if res.err != nil {
firstErr = res.err
} else if err != nil {
firstErr = err
}
}
if done {
return firstErr
}
}
}

func (c *asyncIEResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case c.dataCh <- result:
return nil
}
}

func (c *asyncIEResultChannel) finish() {
close(c.dataCh)
}

// syncIEResultChannel is used to ensure that in execution scenarios which
// do not permit concurrency that there is none. It works by blocking the
// writing goroutine immediately upon sending on the data channel and only
// unblocking it after the reader signals.
type syncIEResultChannel struct {
// ieResultChannel is used to coordinate passing results from an
// internalExecutor to its corresponding iterator. It can be constructed to
// ensure that there is no concurrency between the reader and writer.
type ieResultChannel struct {

// dataCh is the channel on which the connExecutor goroutine sends the rows
// (in addResult) and will block on waitCh after each send. The iterator
// goroutine blocks on dataCh until there is something to receive (rows or
// other metadata) and will return the data to the caller. On the next call
// to Next(), the iterator goroutine unblocks the producer and will block
// itself again. dataCh will be closed (in finish()) when the connExecutor
// goroutine exits its run() loop whereas waitCh is closed when closing the
// iterator.
// (in addResult) and, in the synchronous case, will block on waitCh after
// each send. The iterator goroutine blocks on dataCh until there is
// something to receive (rows or other metadata) and will return the data to
// the caller. On the next call to Next(), the iterator goroutine unblocks
// the producer and will block itself again. dataCh will be closed (in
// finish()) when the connExecutor goroutine exits its run() loop whereas
// waitCh is closed when closing the iterator.
dataCh chan ieIteratorResult

// waitCh is never closed. In all places where the caller may interact with it
// the doneCh is also used. This policy is in place to make it safe to unblock
// both the reader and the writer without any hazards of a blocked reader
// attempting to send on a closed channel.
// waitCh is nil for async ieResultChannels. It is never closed. In all places
// where the caller may interact with it the doneCh is also used. This policy
// is in place to make it safe to unblock both the reader and the writer
// without any hazards of a blocked reader attempting to send on a closed
// channel.
waitCh chan struct{}

// doneCh is used to indicate that the ReadWriter has been closed.
// doneCh is closed under the doneOnce. The doneCh is only used for the
// syncIEResultChannel. This is crucial to ensure that a synchronous writer
// does not attempt to continue to operate after the reader has called close.
// doneCh is used to indicate that the ieResultReader has been closed and is
// closed under the doneOnce, the writer will transition to draining. This
// is crucial to ensure that a synchronous writer does not attempt to
// continue to operate after the reader has called close.
doneCh chan struct{}
doneErr error
doneOnce sync.Once
}

var _ ieResultChannel = &syncIEResultChannel{}

// newSyncIEResultChannel returns an ieResultChannel which synchronizes the
// writer with the reader.
func newSyncIEResultChannel() ieResultChannel {
return &syncIEResultChannel{
// newSyncIEResultChannel is used to ensure that in execution scenarios which
// do not permit concurrency that there is none. It works by blocking the
// writing goroutine immediately upon sending on the data channel and only
// unblocking it after the reader signals.
func newSyncIEResultChannel() *ieResultChannel {
return &ieResultChannel{
dataCh: make(chan ieIteratorResult),
waitCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
}

func (i *syncIEResultChannel) firstResult(
func (i *ieResultChannel) firstResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
select {
case <-ctx.Done():
return ieIteratorResult{}, true, ctx.Err()
case <-i.doneCh:
return ieIteratorResult{}, true, nil
return ieIteratorResult{}, true, ctx.Err()
case res, ok := <-i.dataCh:
if !ok {
return ieIteratorResult{}, true, nil
return ieIteratorResult{}, true, ctx.Err()
}
return res, false, nil
}
}

func (i *syncIEResultChannel) unblockWriter(ctx context.Context) (done bool, err error) {
func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, err error) {
if i.async() {
return false, nil
}
select {
case <-ctx.Done():
return true, ctx.Err()
case <-i.doneCh:
return true, nil
return true, ctx.Err()
case i.waitCh <- struct{}{}:
return false, nil
}
}

func (i *syncIEResultChannel) finish() {
close(i.dataCh)
func (i *ieResultChannel) async() bool {
return i.waitCh == nil
}

func (i *syncIEResultChannel) nextResult(
func (i *ieResultChannel) nextResult(
ctx context.Context,
) (_ ieIteratorResult, done bool, err error) {
if done, err = i.unblockWriter(ctx); done {
if done, err = i.maybeUnblockWriter(ctx); done {
return ieIteratorResult{}, done, err
}
return i.firstResult(ctx)
}

func (i *syncIEResultChannel) close() error {
i.doneOnce.Do(func() { close(i.doneCh) })
return nil
func (i *ieResultChannel) close() error {
i.doneOnce.Do(func() {
close(i.doneCh)
for {
// In the async case, res might contain some actual rows, but we're
// not interested in them; in the sync case, only errors are
// expected to be retrieved from now one because the writer
// transitions to draining.
res, done, err := i.nextResult(context.TODO())
if i.doneErr == nil {
if res.err != nil {
i.doneErr = res.err
} else if err != nil {
i.doneErr = err
}
}
if done {
return
}
}
})
return i.doneErr
}

// errSyncIEResultReaderCanceled is returned by the writer when the reader has
// closed syncIEResultChannel. The error indicates to the writer to shut down
// the query execution, but the reader won't propagate it further.
var errSyncIEResultReaderCanceled = errors.New("synchronous ieResultReader closed")
// errIEResultChannelClosed is returned by the writer when the reader has closed
// ieResultChannel. The error indicates to the writer to drain the query
// execution, but the reader won't propagate it further.
var errIEResultChannelClosed = errors.New("ieResultReader closed")

func (i *syncIEResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-i.doneCh:
return errSyncIEResultReaderCanceled
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return errIEResultChannelClosed
case i.dataCh <- result:
}
return i.maybeBlock(ctx)
}

func (i *ieResultChannel) maybeBlock(ctx context.Context) error {
if i.async() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-i.doneCh:
return errSyncIEResultReaderCanceled
// Prefer the context error if there is one.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return errIEResultChannelClosed
case <-i.waitCh:
return nil
}
}

func (i *ieResultChannel) finish() {
close(i.dataCh)
}

0 comments on commit b606d7d

Please sign in to comment.