Skip to content

Commit

Permalink
rowflow: make routers propagate errors to all non-closed outputs
Browse files Browse the repository at this point in the history
This commit changes the way we propagate the errors in the hash router
so that the error metadata is sent on all non-closed streams.
Previously, we would be sending it over only the first non-closed stream
which could result in the processors on the same stage as that single
stream end to treat the absence of rows and errors as the input being
exhausted successfully, which is wrong because the input did encounter
an error.

Note that unlike in the "original" commit of this backport, we don't
need to make any changes in the vectorized engine because
`colexec.HashRouter` buffers all errors during its run and returns all
of them as metadata.

Release note (bug fix): Previously, CockroachDB could return incorrect
results on query that encountered ReadWithinUncertaintyInterval error,
and this has been fixed.
  • Loading branch information
yuzefovich committed Aug 3, 2020
1 parent b0a0b8b commit fae195c
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 83 deletions.
54 changes: 43 additions & 11 deletions pkg/sql/rowflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,32 +404,63 @@ func (rb *routerBase) updateStreamState(
}
}

// fwdMetadata forwards a metadata record to the first stream that's still
// accepting data.
// fwdMetadata forwards a metadata record to streams that are still accepting
// data. Note that if the metadata record contains an error, it is propagated
// to all non-closed streams whereas all other types of metadata are propagated
// only to the first non-closed stream.
func (rb *routerBase) fwdMetadata(meta *execinfrapb.ProducerMetadata) {
if meta == nil {
log.Fatalf(context.TODO(), "asked to fwd empty metadata")
}

rb.semaphore <- struct{}{}
defer func() {
<-rb.semaphore
}()
if metaErr := meta.Err; metaErr != nil {
// Forward the error to all non-closed streams.
if rb.fwdErrMetadata(metaErr) {
return
}
} else {
// Forward the metadata to the first non-closed stream.
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != execinfra.ConsumerClosed {
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
return
}
ro.mu.Unlock()
}
}
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed))
}

// fwdErrMetadata forwards err to all non-closed streams and returns a boolean
// indicating whether it was sent on at least one stream. Note that this method
// assumes that rb.semaphore has been acquired and leaves it up to the caller
// to release it.
func (rb *routerBase) fwdErrMetadata(err error) bool {
forwarded := false
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != execinfra.ConsumerClosed {
meta := &execinfrapb.ProducerMetadata{Err: err}
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
<-rb.semaphore
return
forwarded = true
} else {
ro.mu.Unlock()
}
ro.mu.Unlock()
}

<-rb.semaphore
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed))
return forwarded
}

func (rb *routerBase) shouldUseSemaphore() bool {
Expand Down Expand Up @@ -490,7 +521,8 @@ func (mr *mirrorRouter) Push(
aggStatus := mr.aggStatus()
if meta != nil {
mr.fwdMetadata(meta)
return aggStatus
// fwdMetadata can change the status, re-read it.
return mr.aggStatus()
}
if aggStatus != execinfra.NeedMoreRows {
return aggStatus
Expand Down
200 changes: 128 additions & 72 deletions pkg/sql/rowflow/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -442,9 +443,9 @@ func preimageAttack(
}
}

// Test that metadata records get forwarded by routers. Regardless of the type
// of router, the records are supposed to be forwarded on the first output
// stream that's not closed.
// Test that metadata records get forwarded by routers. Depending on the type
// of the metadata, it might need to be forward to either one or all non-closed
// streams (regardless of the type of the router).
func TestMetadataIsForwarded(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -473,89 +474,144 @@ func TestMetadataIsForwarded(t *testing.T) {
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
chans := make([]execinfra.RowChannel, 2)
recvs := make([]execinfra.RowReceiver, 2)
tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2)
for i := 0; i < 2; i++ {
chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1)
recvs[i] = &chans[i]
tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)}
}
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

err1 := errors.Errorf("test error 1")
err2 := errors.Errorf("test error 2")
err3 := errors.Errorf("test error 3")
err4 := errors.Errorf("test error 4")

// Push metadata; it should go to stream 0.
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err1})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
// metaConfigs describe different configuration of metadata handling. It
// assumes the test is working in 4 stages with the following events:
// - stage 1 is finished with ConsumerDone() call on stream 0
// - stage 2 is finished with ConsumerClosed() call on stream 0 (at this
// point only stream 1 is non-closed)
// - stage 3 is finished with ConsumerClosed() call on stream 1 (at this
// point all streams are closed).
metaConfigs := []struct {
name string
getMeta func(stage int) *execinfrapb.ProducerMetadata
// getReceiverStreamIDs returns the streamIDs of streams that are
// expected to receive the metadata on the given stage.
getReceiverStreamIDs func(stage int) []int
assertExpected func(streamID int, meta *execinfrapb.ProducerMetadata, stage int)
}{
{
name: "error",
getMeta: func(stage int) *execinfrapb.ProducerMetadata {
return &execinfrapb.ProducerMetadata{
Err: errors.Errorf("test error %d", stage),
}
_, meta := chans[0].Next()
if meta.Err != err1 {
t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err1)
},
getReceiverStreamIDs: func(stage int) []int {
switch stage {
case 1, 2:
// Errors are propagated to all non-closed streams.
return []int{0, 1}
default:
// Stream 0 is closed after stage 2, so now only stream 1
// is expected to receive metadata.
return []int{1}
}
}

chans[0].ConsumerDone()
// Push metadata; it should still go to stream 0.
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err2})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
},
assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) {
expected := fmt.Sprintf("test error %d", stage)
if !strings.Contains(meta.Err.Error(), expected) {
t.Fatalf("stream %d: unexpected meta.Err %v, expected %s", streamID, meta.Err, expected)
}
_, meta := chans[0].Next()
if meta.Err != err2 {
t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err2)
},
},
{
name: "non-error",
getMeta: func(stage int) *execinfrapb.ProducerMetadata {
return &execinfrapb.ProducerMetadata{
RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{RowNum: int32(stage)},
}
}

chans[0].ConsumerClosed()
},
getReceiverStreamIDs: func(stage int) []int {
switch stage {
case 1, 2:
return []int{0}
default:
return []int{1}
}
},
assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) {
if meta.RowNum.RowNum != int32(stage) {
t.Fatalf("streamID %d: unexpected meta %v, expected RowNum=%d in stage %d", streamID, meta, stage, stage)
}
},
},
}

// Metadata should switch to going to stream 1 once the new status is
// observed.
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err3})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
for _, tc := range testCases {
for _, metaConfig := range metaConfigs {
t.Run(fmt.Sprintf("%s/%s", tc.name, metaConfig.name), func(t *testing.T) {
chans := make([]execinfra.RowChannel, 2)
recvs := make([]execinfra.RowReceiver, 2)
tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2)
for i := 0; i < 2; i++ {
chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1)
recvs[i] = &chans[i]
tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)}
}
// Receive on stream 1 if there is a message waiting. Metadata may still
// try to go to 0 for a little while.
select {
case d := <-chans[1].C:
if d.Meta.Err != err3 {
t.Fatalf("unexpected meta.Err %v, expected %s", d.Meta.Err, err3)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

stage := 1
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
for _, streamID := range metaConfig.getReceiverStreamIDs(stage) {
_, meta := chans[streamID].Next()
metaConfig.assertExpected(streamID, meta, stage)
}
return nil
default:
return errors.Errorf("no metadata on stream 1")
}
})
chans[0].ConsumerDone()

chans[1].ConsumerClosed()
stage = 2
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
for _, streamID := range metaConfig.getReceiverStreamIDs(stage) {
_, meta := chans[streamID].Next()
metaConfig.assertExpected(streamID, meta, stage)
}
}
chans[0].ConsumerClosed()

// Start drain the channels in the background.
for i := range chans {
go drainRowChannel(&chans[i])
}
stage = 3
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
// Receive on stream 1 if there is a message waiting. Metadata may still
// try to go to 0 for a little while.
select {
case d := <-chans[1].C:
metaConfig.assertExpected(1 /* streamID */, d.Meta, stage)
return nil
default:
return errors.Errorf("no metadata on stream 1")
}
})
chans[1].ConsumerClosed()

testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err4})
if consumerStatus != execinfra.ConsumerClosed {
return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus)
stage = 4
// Start drain the channels in the background.
for i := range chans {
go drainRowChannel(&chans[i])
}
return nil
})
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.ConsumerClosed {
return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus)
}
return nil
})

router.ProducerDone()
router.ProducerDone()

wg.Wait()
})
wg.Wait()
})
}
}
}

Expand Down

0 comments on commit fae195c

Please sign in to comment.