Skip to content

Commit

Permalink
Merge #51375 #51520
Browse files Browse the repository at this point in the history
51375: rowexec: add test to verify uncertainty errors are properly returned r=yuzefovich a=asubiotto

This PR fixes a bunch of instances that would swallow uncertainty errors in the vectorized engine and adds a test for this. This also uncovered a query termination edge case in the vectorized Inbox we weren't handling properly.

Release note (bug fix): fix edge cases where the vectorized execution engine would drop some results under contention

51520: sql: fix UPDATE for partial indexes with non-indexed predicate columns r=RaduBerinde a=mgartner

This commit fixes a bug where an UPDATE would not correctly add or
remove entries to partial indexes when updating a column present in the
partial index predicate, but not indexed or stored.

To prevent regressions this commit adds similar test for INSERT and
DELETE.

Release note: None

Co-authored-by: Alfonso Subiotto Marques <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Jul 16, 2020
3 parents dc736e8 + 4ab55c5 + 1b1611b commit af0031e
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 157 deletions.
6 changes: 5 additions & 1 deletion pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,12 @@ func (c *Columnarizer) Next(context.Context) coldata.Batch {
for ; nRows < coldata.BatchSize(); nRows++ {
row, meta := c.input.Next()
if meta != nil {
c.accumulatedMeta = append(c.accumulatedMeta, *meta)
nRows--
if meta.Err != nil {
// If an error occurs, return it immediately.
colexecerror.ExpectedError(meta.Err)
}
c.accumulatedMeta = append(c.accumulatedMeta, *meta)
continue
}
if row == nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/colexec/columnarizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -91,14 +92,14 @@ func TestColumnarizerDrainsAndClosesInput(t *testing.T) {

// If the metadata is obtained through this Next call, the Columnarizer still
// returns it in DrainMeta.
_ = c.Next(ctx)
err = colexecerror.CatchVectorizedRuntimeError(func() { c.Next(ctx) })
require.True(t, testutils.IsError(err, errMsg), "unexpected error %v", err)

// Calling DrainMeta from the vectorized execution engine should propagate to
// non-vectorized components as calling ConsumerDone and then draining their
// metadata.
meta := c.DrainMeta(ctx)
require.True(t, len(meta) == 1)
require.True(t, testutils.IsError(meta[0].Err, errMsg))
require.True(t, len(meta) == 0)
require.True(t, rb.Done)
require.Equal(t, execinfra.DrainRequested, rb.ConsumerStatus, "unexpected consumer status %d", rb.ConsumerStatus)
// Closing the Columnarizer should call ConsumerClosed on the processor.
Expand Down
98 changes: 59 additions & 39 deletions pkg/sql/colexec/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ type routerOutput interface {
// the output. It returns whether or not the output changed its state to
// blocked (see implementations).
addBatch(context.Context, coldata.Batch, []int) bool
// cancel tells the output to stop producing batches.
cancel(ctx context.Context)
// cancel tells the output to stop producing batches. Optionally forwards an
// error if not nil.
cancel(context.Context, error)
// forwardErr forwards an error to the output. The output should call
// colexecerror.ExpectedError with this error on the next call to Next.
// Calling forwardErr multiple times will result in the most recent error
// overwriting the previous error.
forwardErr(error)
// resetForTests resets the routerOutput for a benchmark or test run.
resetForTests(context.Context)
}

// getDefaultRouterOutputBlockedThreshold returns the number of unread values
Expand Down Expand Up @@ -78,7 +86,7 @@ type drainCoordinator interface {
// encounteredError should be called when a routerOutput encounters an error.
// This terminates execution. No locks should be held when calling this
// method, since cancellation could occur.
encounteredError(context.Context, error)
encounteredError(context.Context)
// drainMeta should be called exactly once when the routerOutput moves to
// draining.
drainMeta() []execinfrapb.ProducerMetadata
Expand All @@ -100,6 +108,9 @@ type routerOutputOp struct {
mu struct {
syncutil.Mutex
state routerOutputOpState
// forwardedErr is an error that was forwarded by the HashRouter. If set,
// any subsequent calls to Next will return this error.
forwardedErr error
// unlimitedAllocator tracks the memory usage of this router output,
// providing a signal for when it should spill to disk.
// The memory lifecycle is as follows:
Expand Down Expand Up @@ -245,7 +256,7 @@ func (o *routerOutputOp) nextErrorLocked(ctx context.Context, err error) {
o.maybeUnblockLocked()
// Unlock the mutex, since the HashRouter will cancel all outputs.
o.mu.Unlock()
o.drainCoordinator.encounteredError(ctx, err)
o.drainCoordinator.encounteredError(ctx)
o.mu.Lock()
colexecerror.InternalError(err)
}
Expand All @@ -256,10 +267,13 @@ func (o *routerOutputOp) nextErrorLocked(ctx context.Context, err error) {
func (o *routerOutputOp) Next(ctx context.Context) coldata.Batch {
o.mu.Lock()
defer o.mu.Unlock()
for o.mu.state == routerOutputOpRunning && o.mu.pendingBatch == nil && o.mu.data.empty() {
for o.mu.forwardedErr == nil && o.mu.state == routerOutputOpRunning && o.mu.pendingBatch == nil && o.mu.data.empty() {
// Wait until there is data to read or the output is canceled.
o.mu.cond.Wait()
}
if o.mu.forwardedErr != nil {
colexecerror.ExpectedError(o.mu.forwardedErr)
}
if o.mu.state == routerOutputOpDraining {
return coldata.ZeroBatch
}
Expand Down Expand Up @@ -325,16 +339,30 @@ func (o *routerOutputOp) closeLocked(ctx context.Context) {
// cancel wakes up a reader in Next if there is one and results in the output
// returning zero length batches for every Next call after cancel. Note that
// all accumulated data that hasn't been read will not be returned.
func (o *routerOutputOp) cancel(ctx context.Context) {
func (o *routerOutputOp) cancel(ctx context.Context, err error) {
o.mu.Lock()
defer o.mu.Unlock()
o.closeLocked(ctx)
o.forwardErrLocked(err)
// Some goroutine might be waiting on the condition variable, so wake it up.
// Note that read goroutines check o.mu.done, so won't wait on the condition
// variable after we unlock the mutex.
o.mu.cond.Signal()
}

func (o *routerOutputOp) forwardErrLocked(err error) {
if err != nil {
o.mu.forwardedErr = err
}
}

func (o *routerOutputOp) forwardErr(err error) {
o.mu.Lock()
defer o.mu.Unlock()
o.forwardErrLocked(err)
o.mu.cond.Signal()
}

// addBatch copies the columns in batch according to selection into an internal
// buffer.
// The routerOutputOp only adds the elements specified by selection. Therefore,
Expand Down Expand Up @@ -449,11 +477,12 @@ func (o *routerOutputOp) maybeUnblockLocked() {
}
}

// resetForBenchmarks resets the routerOutputOp for a benchmark run.
func (o *routerOutputOp) resetForBenchmarks(ctx context.Context) {
// resetForTests resets the routerOutputOp for a test or benchmark run.
func (o *routerOutputOp) resetForTests(ctx context.Context) {
o.mu.Lock()
defer o.mu.Unlock()
o.mu.state = routerOutputOpRunning
o.mu.forwardedErr = nil
o.mu.data.reset(ctx)
o.mu.numUnread = 0
o.mu.blocked = false
Expand Down Expand Up @@ -605,20 +634,22 @@ func newHashRouterWithOutputs(
return r
}

// bufferErr buffers the given error to be returned by one of the router outputs.
func (r *HashRouter) bufferErr(err error) {
if err == nil {
return
}
r.bufferedMeta = append(r.bufferedMeta, execinfrapb.ProducerMetadata{Err: err})
}

func (r *HashRouter) cancelOutputs(ctx context.Context) {
// cancelOutputs cancels all outputs and forwards the given error to one output
// if non-nil. The only case where the error is not forwarded if no output could
// be canceled due to an error. In this case each output will forward the error
// returned during cancellation.
func (r *HashRouter) cancelOutputs(ctx context.Context, errToForward error) {
for _, o := range r.outputs {
if err := colexecerror.CatchVectorizedRuntimeError(func() {
o.cancel(ctx)
o.cancel(ctx, errToForward)
}); err != nil {
r.bufferErr(err)
// If there was an error canceling this output, this error can be
// forwarded to whoever is calling Next.
o.forwardErr(err)
} else {
// Successful cancellation, which means errToForward was also consumed.
// Set it to nil to not forward it to another output.
errToForward = nil
}
}
}
Expand All @@ -641,15 +672,6 @@ func (r *HashRouter) Run(ctx context.Context) {
// well for more fine-grained control of error propagation.
if err := colexecerror.CatchVectorizedRuntimeError(func() {
r.input.Init()
// bufferErrAndCancelOutputs buffers non-nil error as metadata, cancels all
// of the outputs additionally buffering any error if such occurs during the
// outputs' cancellation as metadata as well. Note that it attempts to
// cancel every output regardless of whether "previous" output's
// cancellation succeeds.
bufferErrAndCancelOutputs := func(err error) {
r.bufferErr(err)
r.cancelOutputs(ctx)
}
var done bool
processNextBatch := func() {
done = r.processNextBatch(ctx)
Expand All @@ -662,7 +684,7 @@ func (r *HashRouter) Run(ctx context.Context) {
// Check for cancellation.
select {
case <-ctx.Done():
bufferErrAndCancelOutputs(ctx.Err())
r.cancelOutputs(ctx, ctx.Err())
return
default:
}
Expand All @@ -685,13 +707,13 @@ func (r *HashRouter) Run(ctx context.Context) {
case <-r.unblockedEventsChan:
r.numBlockedOutputs--
case <-ctx.Done():
bufferErrAndCancelOutputs(ctx.Err())
r.cancelOutputs(ctx, ctx.Err())
return
}
}

if err := colexecerror.CatchVectorizedRuntimeError(processNextBatch); err != nil {
bufferErrAndCancelOutputs(err)
r.cancelOutputs(ctx, err)
return
}
if done {
Expand All @@ -701,7 +723,7 @@ func (r *HashRouter) Run(ctx context.Context) {
}
}
}); err != nil {
r.bufferErr(err)
r.cancelOutputs(ctx, err)
}

// Non-blocking send of metadata so that one of the outputs can return it
Expand Down Expand Up @@ -738,8 +760,8 @@ func (r *HashRouter) processNextBatch(ctx context.Context) bool {
return false
}

// resetForBenchmarks resets the HashRouter for a benchmark run.
func (r *HashRouter) resetForBenchmarks(ctx context.Context) {
// resetForTests resets the HashRouter for a test or benchmark run.
func (r *HashRouter) resetForTests(ctx context.Context) {
if i, ok := r.input.(resetter); ok {
i.reset(ctx)
}
Expand All @@ -756,19 +778,17 @@ func (r *HashRouter) resetForBenchmarks(ctx context.Context) {
}
}
for _, o := range r.outputs {
if op, ok := o.(*routerOutputOp); ok {
op.resetForBenchmarks(ctx)
}
o.resetForTests(ctx)
}
}

func (r *HashRouter) encounteredError(ctx context.Context, err error) {
func (r *HashRouter) encounteredError(ctx context.Context) {
// Once one output returns an error the hash router needs to stop running
// and drain its input.
r.setDrainState(hashRouterDrainStateRequested)
// cancel all outputs. The Run goroutine will eventually realize that the
// HashRouter is done and exit without draining.
r.cancelOutputs(ctx)
r.cancelOutputs(ctx, nil /* errToForward */)
}

func (r *HashRouter) drainMeta() []execinfrapb.ProducerMetadata {
Expand Down
Loading

0 comments on commit af0031e

Please sign in to comment.