From d8f85e2d58c1d50d99956b2a4ad2c1cb477b00f1 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 1 Apr 2021 22:49:31 -0400 Subject: [PATCH] sql: clean up ieResultChannel concepts and fix race condition The async and sync implementations were too close to justify two structs. Also, the async behavior of not stopping the writer in case the reader called close wasn't desireable. This commit unifies the implementation. It also ensures that we propagate context errors in all cases triggered by the closure of the done channel. It also makes closing the channel idempotent. Additionally, this commit transitions the execution flow into draining state without setting our customer error on the resultWriter. Release note: None --- pkg/sql/distsql_running.go | 8 +- pkg/sql/internal.go | 2 +- pkg/sql/internal_result_channel.go | 208 +++++++++++++---------------- pkg/sql/user_test.go | 2 - 4 files changed, 98 insertions(+), 122 deletions(-) diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 539cfc891da7..73e9e3d77109 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -738,10 +738,10 @@ func (r *DistSQLReceiver) Push( } r.tracing.TraceExecRowsResult(r.ctx, r.row) if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil { - if errors.Is(commErr, ErrLimitedResultClosed) { - // ErrLimitedResultClosed is not a real error, it is a signal to - // stop distsql and return success to the client (that's why we - // don't set the error on the resultWriter). + if errors.Is(commErr, ErrLimitedResultClosed) || errors.Is(commErr, errIEResultChannelClosed) { + // ErrLimitedResultClosed and errIEResultChannelClosed are not real + // errors, it is a signal to stop distsql and return success to the + // client (that's why we don't set the error on the resultWriter). r.status = execinfra.DrainRequested } else { // Set the error on the resultWriter to notify the consumer about diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 173fbf8e5ef5..e6411c1ea510 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -604,7 +604,7 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{ func (ie *InternalExecutor) execInternal( ctx context.Context, opName string, - rw ieResultChannel, + rw *ieResultChannel, txn *kv.Txn, sessionDataOverride sessiondata.InternalExecutorOverride, stmt string, diff --git a/pkg/sql/internal_result_channel.go b/pkg/sql/internal_result_channel.go index 33a41540d3d7..d8e4a282dddc 100644 --- a/pkg/sql/internal_result_channel.go +++ b/pkg/sql/internal_result_channel.go @@ -18,13 +18,6 @@ import ( "github.com/cockroachdb/errors" ) -// ieResultChannel is used to coordinate passing results from an -// internalExecutor to its corresponding iterator. -type ieResultChannel interface { - ieResultReader - ieResultWriter -} - // ieResultReader is used to read internalExecutor results. // It is managed by the rowsIterator. type ieResultReader interface { @@ -34,15 +27,13 @@ type ieResultReader interface { // currently blocked and waits for the initial result to be written. firstResult(ctx context.Context) (_ ieIteratorResult, done bool, err error) - // nextResult returns the nextResult. Done will always be true if err + // nextResult returns the next result. Done will always be true if err // is non-nil. Err will be non-nil if either close has been called or // the passed context is finished. nextResult(ctx context.Context) (_ ieIteratorResult, done bool, err error) - // close ensures that the either writer has finished writing. In the case - // of an asynchronous channel, close will drain the writer's channel. In the - // case of the synchronous channel, it will ensure that the writer receives - // an error when it wakes. + // close ensures that either writer has finished writing. The writer will + // receive a signal to drain, and close will drain the writer's channel. close() error } @@ -67,174 +58,161 @@ var asyncIEResultChannelBufferSize = util.ConstantWithMetamorphicTestRange( // newAsyncIEResultChannel returns an ieResultChannel which does not attempt to // synchronize the writer with the reader. -func newAsyncIEResultChannel() ieResultChannel { - return &asyncIEResultChannel{ +func newAsyncIEResultChannel() *ieResultChannel { + return &ieResultChannel{ dataCh: make(chan ieIteratorResult, asyncIEResultChannelBufferSize), + doneCh: make(chan struct{}), } } -type asyncIEResultChannel struct { - dataCh chan ieIteratorResult -} - -var _ ieResultChannel = &asyncIEResultChannel{} - -func (c *asyncIEResultChannel) firstResult( - ctx context.Context, -) (_ ieIteratorResult, done bool, err error) { - select { - case <-ctx.Done(): - return ieIteratorResult{}, true, ctx.Err() - case res, ok := <-c.dataCh: - if !ok { - return ieIteratorResult{}, true, nil - } - return res, false, nil - } -} - -func (c *asyncIEResultChannel) nextResult( - ctx context.Context, -) (_ ieIteratorResult, done bool, err error) { - return c.firstResult(ctx) -} - -func (c *asyncIEResultChannel) close() error { - var firstErr error - for { - res, done, err := c.nextResult(context.TODO()) - if firstErr == nil { - if res.err != nil { - firstErr = res.err - } else if err != nil { - firstErr = err - } - } - if done { - return firstErr - } - } -} - -func (c *asyncIEResultChannel) addResult(ctx context.Context, result ieIteratorResult) error { - select { - case <-ctx.Done(): - return ctx.Err() - case c.dataCh <- result: - return nil - } -} - -func (c *asyncIEResultChannel) finish() { - close(c.dataCh) -} - -// syncIEResultChannel is used to ensure that in execution scenarios which -// do not permit concurrency that there is none. It works by blocking the -// writing goroutine immediately upon sending on the data channel and only -// unblocking it after the reader signals. -type syncIEResultChannel struct { +// ieResultChannel is used to coordinate passing results from an +// internalExecutor to its corresponding iterator. It can be constructed to +// ensure that there is no concurrency between the reader and writer. +type ieResultChannel struct { // dataCh is the channel on which the connExecutor goroutine sends the rows - // (in addResult) and will block on waitCh after each send. The iterator - // goroutine blocks on dataCh until there is something to receive (rows or - // other metadata) and will return the data to the caller. On the next call - // to Next(), the iterator goroutine unblocks the producer and will block - // itself again. dataCh will be closed (in finish()) when the connExecutor - // goroutine exits its run() loop whereas waitCh is closed when closing the - // iterator. + // (in addResult) and, in the synchronous case, will block on waitCh after + // each send. The iterator goroutine blocks on dataCh until there is + // something to receive (rows or other metadata) and will return the data to + // the caller. On the next call to Next(), the iterator goroutine unblocks + // the producer and will block itself again. dataCh will be closed (in + // finish()) when the connExecutor goroutine exits its run() loop whereas + // waitCh is closed when closing the iterator. dataCh chan ieIteratorResult - // waitCh is never closed. In all places where the caller may interact with it - // the doneCh is also used. This policy is in place to make it safe to unblock - // both the reader and the writer without any hazards of a blocked reader - // attempting to send on a closed channel. + // waitCh is nil for async ieResultChannels. It is never closed. In all places + // where the caller may interact with it the doneCh is also used. This policy + // is in place to make it safe to unblock both the reader and the writer + // without any hazards of a blocked reader attempting to send on a closed + // channel. waitCh chan struct{} - // doneCh is used to indicate that the ReadWriter has been closed. - // doneCh is closed under the doneOnce. The doneCh is only used for the - // syncIEResultChannel. This is crucial to ensure that a synchronous writer - // does not attempt to continue to operate after the reader has called close. + // doneCh is used to indicate that the ieResultReader has been closed and is + // closed under the doneOnce, the writer will transition to draining. This + // is crucial to ensure that a synchronous writer does not attempt to + // continue to operate after the reader has called close. doneCh chan struct{} + doneErr error doneOnce sync.Once } -var _ ieResultChannel = &syncIEResultChannel{} - -// newSyncIEResultChannel returns an ieResultChannel which synchronizes the -// writer with the reader. -func newSyncIEResultChannel() ieResultChannel { - return &syncIEResultChannel{ +// newSyncIEResultChannel is used to ensure that in execution scenarios which +// do not permit concurrency that there is none. It works by blocking the +// writing goroutine immediately upon sending on the data channel and only +// unblocking it after the reader signals. +func newSyncIEResultChannel() *ieResultChannel { + return &ieResultChannel{ dataCh: make(chan ieIteratorResult), waitCh: make(chan struct{}), doneCh: make(chan struct{}), } } -func (i *syncIEResultChannel) firstResult( +func (i *ieResultChannel) firstResult( ctx context.Context, ) (_ ieIteratorResult, done bool, err error) { select { case <-ctx.Done(): return ieIteratorResult{}, true, ctx.Err() case <-i.doneCh: - return ieIteratorResult{}, true, nil + return ieIteratorResult{}, true, ctx.Err() case res, ok := <-i.dataCh: if !ok { - return ieIteratorResult{}, true, nil + return ieIteratorResult{}, true, ctx.Err() } return res, false, nil } } -func (i *syncIEResultChannel) unblockWriter(ctx context.Context) (done bool, err error) { +func (i *ieResultChannel) maybeUnblockWriter(ctx context.Context) (done bool, err error) { + if i.async() { + return false, nil + } select { case <-ctx.Done(): return true, ctx.Err() case <-i.doneCh: - return true, nil + return true, ctx.Err() case i.waitCh <- struct{}{}: return false, nil } } -func (i *syncIEResultChannel) finish() { - close(i.dataCh) +func (i *ieResultChannel) async() bool { + return i.waitCh == nil } -func (i *syncIEResultChannel) nextResult( +func (i *ieResultChannel) nextResult( ctx context.Context, ) (_ ieIteratorResult, done bool, err error) { - if done, err = i.unblockWriter(ctx); done { + if done, err = i.maybeUnblockWriter(ctx); done { return ieIteratorResult{}, done, err } return i.firstResult(ctx) } -func (i *syncIEResultChannel) close() error { - i.doneOnce.Do(func() { close(i.doneCh) }) - return nil +func (i *ieResultChannel) close() error { + i.doneOnce.Do(func() { + close(i.doneCh) + for { + // In the async case, res might contain some actual rows, but we're + // not interested in them; in the sync case, only errors are + // expected to be retrieved from now one because the writer + // transitions to draining. + res, done, err := i.nextResult(context.TODO()) + if i.doneErr == nil { + if res.err != nil { + i.doneErr = res.err + } else if err != nil { + i.doneErr = err + } + } + if done { + return + } + } + }) + return i.doneErr } -// errSyncIEResultReaderCanceled is returned by the writer when the reader has -// closed syncIEResultChannel. The error indicates to the writer to shut down -// the query execution, but the reader won't propagate it further. -var errSyncIEResultReaderCanceled = errors.New("synchronous ieResultReader closed") +// errIEResultChannelClosed is returned by the writer when the reader has closed +// ieResultChannel. The error indicates to the writer to drain the query +// execution, but the reader won't propagate it further. +var errIEResultChannelClosed = errors.New("ieResultReader closed") -func (i *syncIEResultChannel) addResult(ctx context.Context, result ieIteratorResult) error { +func (i *ieResultChannel) addResult(ctx context.Context, result ieIteratorResult) error { select { case <-ctx.Done(): return ctx.Err() case <-i.doneCh: - return errSyncIEResultReaderCanceled + // Prefer the context error if there is one. + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + return errIEResultChannelClosed case i.dataCh <- result: } + return i.maybeBlock(ctx) +} + +func (i *ieResultChannel) maybeBlock(ctx context.Context) error { + if i.async() { + return nil + } select { case <-ctx.Done(): return ctx.Err() case <-i.doneCh: - return errSyncIEResultReaderCanceled + // Prefer the context error if there is one. + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + return errIEResultChannelClosed case <-i.waitCh: return nil } } + +func (i *ieResultChannel) finish() { + close(i.dataCh) +} diff --git a/pkg/sql/user_test.go b/pkg/sql/user_test.go index aa884f7ffef2..7e277a7c9faf 100644 --- a/pkg/sql/user_test.go +++ b/pkg/sql/user_test.go @@ -45,8 +45,6 @@ func TestGetUserHashedPasswordTimeout(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 62948 /* githubIssueID */) - // We want to use a low timeout below to prevent // this test from taking forever, however // race builds are so slow as to trigger this timeout spuriously.