Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.2: rowflow: make routers propagate errors to all non-closed outputs #52252

Merged
merged 1 commit into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions pkg/sql/rowflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,32 +404,63 @@ func (rb *routerBase) updateStreamState(
}
}

// fwdMetadata forwards a metadata record to the first stream that's still
// accepting data.
// fwdMetadata forwards a metadata record to streams that are still accepting
// data. Note that if the metadata record contains an error, it is propagated
// to all non-closed streams whereas all other types of metadata are propagated
// only to the first non-closed stream.
func (rb *routerBase) fwdMetadata(meta *execinfrapb.ProducerMetadata) {
if meta == nil {
log.Fatalf(context.TODO(), "asked to fwd empty metadata")
}

rb.semaphore <- struct{}{}
defer func() {
<-rb.semaphore
}()
if metaErr := meta.Err; metaErr != nil {
// Forward the error to all non-closed streams.
if rb.fwdErrMetadata(metaErr) {
return
}
} else {
// Forward the metadata to the first non-closed stream.
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != execinfra.ConsumerClosed {
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
return
}
ro.mu.Unlock()
}
}
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed))
}

// fwdErrMetadata forwards err to all non-closed streams and returns a boolean
// indicating whether it was sent on at least one stream. Note that this method
// assumes that rb.semaphore has been acquired and leaves it up to the caller
// to release it.
func (rb *routerBase) fwdErrMetadata(err error) bool {
forwarded := false
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != execinfra.ConsumerClosed {
meta := &execinfrapb.ProducerMetadata{Err: err}
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
<-rb.semaphore
return
forwarded = true
} else {
ro.mu.Unlock()
}
ro.mu.Unlock()
}

<-rb.semaphore
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed))
return forwarded
}

func (rb *routerBase) shouldUseSemaphore() bool {
Expand Down Expand Up @@ -490,7 +521,8 @@ func (mr *mirrorRouter) Push(
aggStatus := mr.aggStatus()
if meta != nil {
mr.fwdMetadata(meta)
return aggStatus
// fwdMetadata can change the status, re-read it.
return mr.aggStatus()
}
if aggStatus != execinfra.NeedMoreRows {
return aggStatus
Expand Down
200 changes: 128 additions & 72 deletions pkg/sql/rowflow/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -442,9 +443,9 @@ func preimageAttack(
}
}

// Test that metadata records get forwarded by routers. Regardless of the type
// of router, the records are supposed to be forwarded on the first output
// stream that's not closed.
// Test that metadata records get forwarded by routers. Depending on the type
// of the metadata, it might need to be forward to either one or all non-closed
// streams (regardless of the type of the router).
func TestMetadataIsForwarded(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -473,89 +474,144 @@ func TestMetadataIsForwarded(t *testing.T) {
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
chans := make([]execinfra.RowChannel, 2)
recvs := make([]execinfra.RowReceiver, 2)
tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2)
for i := 0; i < 2; i++ {
chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1)
recvs[i] = &chans[i]
tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)}
}
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

err1 := errors.Errorf("test error 1")
err2 := errors.Errorf("test error 2")
err3 := errors.Errorf("test error 3")
err4 := errors.Errorf("test error 4")

// Push metadata; it should go to stream 0.
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err1})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
// metaConfigs describe different configuration of metadata handling. It
// assumes the test is working in 4 stages with the following events:
// - stage 1 is finished with ConsumerDone() call on stream 0
// - stage 2 is finished with ConsumerClosed() call on stream 0 (at this
// point only stream 1 is non-closed)
// - stage 3 is finished with ConsumerClosed() call on stream 1 (at this
// point all streams are closed).
metaConfigs := []struct {
name string
getMeta func(stage int) *execinfrapb.ProducerMetadata
// getReceiverStreamIDs returns the streamIDs of streams that are
// expected to receive the metadata on the given stage.
getReceiverStreamIDs func(stage int) []int
assertExpected func(streamID int, meta *execinfrapb.ProducerMetadata, stage int)
}{
{
name: "error",
getMeta: func(stage int) *execinfrapb.ProducerMetadata {
return &execinfrapb.ProducerMetadata{
Err: errors.Errorf("test error %d", stage),
}
_, meta := chans[0].Next()
if meta.Err != err1 {
t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err1)
},
getReceiverStreamIDs: func(stage int) []int {
switch stage {
case 1, 2:
// Errors are propagated to all non-closed streams.
return []int{0, 1}
default:
// Stream 0 is closed after stage 2, so now only stream 1
// is expected to receive metadata.
return []int{1}
}
}

chans[0].ConsumerDone()
// Push metadata; it should still go to stream 0.
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err2})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
},
assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) {
expected := fmt.Sprintf("test error %d", stage)
if !strings.Contains(meta.Err.Error(), expected) {
t.Fatalf("stream %d: unexpected meta.Err %v, expected %s", streamID, meta.Err, expected)
}
_, meta := chans[0].Next()
if meta.Err != err2 {
t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err2)
},
},
{
name: "non-error",
getMeta: func(stage int) *execinfrapb.ProducerMetadata {
return &execinfrapb.ProducerMetadata{
RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{RowNum: int32(stage)},
}
}

chans[0].ConsumerClosed()
},
getReceiverStreamIDs: func(stage int) []int {
switch stage {
case 1, 2:
return []int{0}
default:
return []int{1}
}
},
assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) {
if meta.RowNum.RowNum != int32(stage) {
t.Fatalf("streamID %d: unexpected meta %v, expected RowNum=%d in stage %d", streamID, meta, stage, stage)
}
},
},
}

// Metadata should switch to going to stream 1 once the new status is
// observed.
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err3})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
for _, tc := range testCases {
for _, metaConfig := range metaConfigs {
t.Run(fmt.Sprintf("%s/%s", tc.name, metaConfig.name), func(t *testing.T) {
chans := make([]execinfra.RowChannel, 2)
recvs := make([]execinfra.RowReceiver, 2)
tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2)
for i := 0; i < 2; i++ {
chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1)
recvs[i] = &chans[i]
tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)}
}
// Receive on stream 1 if there is a message waiting. Metadata may still
// try to go to 0 for a little while.
select {
case d := <-chans[1].C:
if d.Meta.Err != err3 {
t.Fatalf("unexpected meta.Err %v, expected %s", d.Meta.Err, err3)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

stage := 1
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
for _, streamID := range metaConfig.getReceiverStreamIDs(stage) {
_, meta := chans[streamID].Next()
metaConfig.assertExpected(streamID, meta, stage)
}
return nil
default:
return errors.Errorf("no metadata on stream 1")
}
})
chans[0].ConsumerDone()

chans[1].ConsumerClosed()
stage = 2
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
for _, streamID := range metaConfig.getReceiverStreamIDs(stage) {
_, meta := chans[streamID].Next()
metaConfig.assertExpected(streamID, meta, stage)
}
}
chans[0].ConsumerClosed()

// Start drain the channels in the background.
for i := range chans {
go drainRowChannel(&chans[i])
}
stage = 3
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
// Receive on stream 1 if there is a message waiting. Metadata may still
// try to go to 0 for a little while.
select {
case d := <-chans[1].C:
metaConfig.assertExpected(1 /* streamID */, d.Meta, stage)
return nil
default:
return errors.Errorf("no metadata on stream 1")
}
})
chans[1].ConsumerClosed()

testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err4})
if consumerStatus != execinfra.ConsumerClosed {
return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus)
stage = 4
// Start drain the channels in the background.
for i := range chans {
go drainRowChannel(&chans[i])
}
return nil
})
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.ConsumerClosed {
return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus)
}
return nil
})

router.ProducerDone()
router.ProducerDone()

wg.Wait()
})
wg.Wait()
})
}
}
}

Expand Down