Skip to content

Commit

Permalink
Merge pull request #52252 from yuzefovich/backport19.2-51518
Browse files Browse the repository at this point in the history
release-19.2: rowflow: make routers propagate errors to all non-closed outputs
  • Loading branch information
yuzefovich authored Aug 6, 2020
2 parents 69cd5b4 + fae195c commit b806580
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 83 deletions.
54 changes: 43 additions & 11 deletions pkg/sql/rowflow/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,32 +404,63 @@ func (rb *routerBase) updateStreamState(
}
}

// fwdMetadata forwards a metadata record to the first stream that's still
// accepting data.
// fwdMetadata forwards a metadata record to streams that are still accepting
// data. Note that if the metadata record contains an error, it is propagated
// to all non-closed streams whereas all other types of metadata are propagated
// only to the first non-closed stream.
func (rb *routerBase) fwdMetadata(meta *execinfrapb.ProducerMetadata) {
if meta == nil {
log.Fatalf(context.TODO(), "asked to fwd empty metadata")
}

rb.semaphore <- struct{}{}
defer func() {
<-rb.semaphore
}()
if metaErr := meta.Err; metaErr != nil {
// Forward the error to all non-closed streams.
if rb.fwdErrMetadata(metaErr) {
return
}
} else {
// Forward the metadata to the first non-closed stream.
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != execinfra.ConsumerClosed {
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
return
}
ro.mu.Unlock()
}
}
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed))
}

// fwdErrMetadata forwards err to all non-closed streams and returns a boolean
// indicating whether it was sent on at least one stream. Note that this method
// assumes that rb.semaphore has been acquired and leaves it up to the caller
// to release it.
func (rb *routerBase) fwdErrMetadata(err error) bool {
forwarded := false
for i := range rb.outputs {
ro := &rb.outputs[i]
ro.mu.Lock()
if ro.mu.streamStatus != execinfra.ConsumerClosed {
meta := &execinfrapb.ProducerMetadata{Err: err}
ro.addMetadataLocked(meta)
ro.mu.Unlock()
ro.mu.cond.Signal()
<-rb.semaphore
return
forwarded = true
} else {
ro.mu.Unlock()
}
ro.mu.Unlock()
}

<-rb.semaphore
// If we got here it means that we couldn't even forward metadata anywhere;
// all streams are closed.
atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed))
return forwarded
}

func (rb *routerBase) shouldUseSemaphore() bool {
Expand Down Expand Up @@ -490,7 +521,8 @@ func (mr *mirrorRouter) Push(
aggStatus := mr.aggStatus()
if meta != nil {
mr.fwdMetadata(meta)
return aggStatus
// fwdMetadata can change the status, re-read it.
return mr.aggStatus()
}
if aggStatus != execinfra.NeedMoreRows {
return aggStatus
Expand Down
200 changes: 128 additions & 72 deletions pkg/sql/rowflow/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -442,9 +443,9 @@ func preimageAttack(
}
}

// Test that metadata records get forwarded by routers. Regardless of the type
// of router, the records are supposed to be forwarded on the first output
// stream that's not closed.
// Test that metadata records get forwarded by routers. Depending on the type
// of the metadata, it might need to be forward to either one or all non-closed
// streams (regardless of the type of the router).
func TestMetadataIsForwarded(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -473,89 +474,144 @@ func TestMetadataIsForwarded(t *testing.T) {
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
chans := make([]execinfra.RowChannel, 2)
recvs := make([]execinfra.RowReceiver, 2)
tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2)
for i := 0; i < 2; i++ {
chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1)
recvs[i] = &chans[i]
tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)}
}
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

err1 := errors.Errorf("test error 1")
err2 := errors.Errorf("test error 2")
err3 := errors.Errorf("test error 3")
err4 := errors.Errorf("test error 4")

// Push metadata; it should go to stream 0.
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err1})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
// metaConfigs describe different configuration of metadata handling. It
// assumes the test is working in 4 stages with the following events:
// - stage 1 is finished with ConsumerDone() call on stream 0
// - stage 2 is finished with ConsumerClosed() call on stream 0 (at this
// point only stream 1 is non-closed)
// - stage 3 is finished with ConsumerClosed() call on stream 1 (at this
// point all streams are closed).
metaConfigs := []struct {
name string
getMeta func(stage int) *execinfrapb.ProducerMetadata
// getReceiverStreamIDs returns the streamIDs of streams that are
// expected to receive the metadata on the given stage.
getReceiverStreamIDs func(stage int) []int
assertExpected func(streamID int, meta *execinfrapb.ProducerMetadata, stage int)
}{
{
name: "error",
getMeta: func(stage int) *execinfrapb.ProducerMetadata {
return &execinfrapb.ProducerMetadata{
Err: errors.Errorf("test error %d", stage),
}
_, meta := chans[0].Next()
if meta.Err != err1 {
t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err1)
},
getReceiverStreamIDs: func(stage int) []int {
switch stage {
case 1, 2:
// Errors are propagated to all non-closed streams.
return []int{0, 1}
default:
// Stream 0 is closed after stage 2, so now only stream 1
// is expected to receive metadata.
return []int{1}
}
}

chans[0].ConsumerDone()
// Push metadata; it should still go to stream 0.
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err2})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
},
assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) {
expected := fmt.Sprintf("test error %d", stage)
if !strings.Contains(meta.Err.Error(), expected) {
t.Fatalf("stream %d: unexpected meta.Err %v, expected %s", streamID, meta.Err, expected)
}
_, meta := chans[0].Next()
if meta.Err != err2 {
t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err2)
},
},
{
name: "non-error",
getMeta: func(stage int) *execinfrapb.ProducerMetadata {
return &execinfrapb.ProducerMetadata{
RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{RowNum: int32(stage)},
}
}

chans[0].ConsumerClosed()
},
getReceiverStreamIDs: func(stage int) []int {
switch stage {
case 1, 2:
return []int{0}
default:
return []int{1}
}
},
assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) {
if meta.RowNum.RowNum != int32(stage) {
t.Fatalf("streamID %d: unexpected meta %v, expected RowNum=%d in stage %d", streamID, meta, stage, stage)
}
},
},
}

// Metadata should switch to going to stream 1 once the new status is
// observed.
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err3})
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
for _, tc := range testCases {
for _, metaConfig := range metaConfigs {
t.Run(fmt.Sprintf("%s/%s", tc.name, metaConfig.name), func(t *testing.T) {
chans := make([]execinfra.RowChannel, 2)
recvs := make([]execinfra.RowReceiver, 2)
tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2)
for i := 0; i < 2; i++ {
chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1)
recvs[i] = &chans[i]
tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)}
}
// Receive on stream 1 if there is a message waiting. Metadata may still
// try to go to 0 for a little while.
select {
case d := <-chans[1].C:
if d.Meta.Err != err3 {
t.Fatalf("unexpected meta.Err %v, expected %s", d.Meta.Err, err3)
router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs)

stage := 1
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
for _, streamID := range metaConfig.getReceiverStreamIDs(stage) {
_, meta := chans[streamID].Next()
metaConfig.assertExpected(streamID, meta, stage)
}
return nil
default:
return errors.Errorf("no metadata on stream 1")
}
})
chans[0].ConsumerDone()

chans[1].ConsumerClosed()
stage = 2
for i := 0; i < 10; i++ {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
for _, streamID := range metaConfig.getReceiverStreamIDs(stage) {
_, meta := chans[streamID].Next()
metaConfig.assertExpected(streamID, meta, stage)
}
}
chans[0].ConsumerClosed()

// Start drain the channels in the background.
for i := range chans {
go drainRowChannel(&chans[i])
}
stage = 3
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.NeedMoreRows {
t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus)
}
// Receive on stream 1 if there is a message waiting. Metadata may still
// try to go to 0 for a little while.
select {
case d := <-chans[1].C:
metaConfig.assertExpected(1 /* streamID */, d.Meta, stage)
return nil
default:
return errors.Errorf("no metadata on stream 1")
}
})
chans[1].ConsumerClosed()

testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err4})
if consumerStatus != execinfra.ConsumerClosed {
return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus)
stage = 4
// Start drain the channels in the background.
for i := range chans {
go drainRowChannel(&chans[i])
}
return nil
})
testutils.SucceedsSoon(t, func() error {
consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage))
if consumerStatus != execinfra.ConsumerClosed {
return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus)
}
return nil
})

router.ProducerDone()
router.ProducerDone()

wg.Wait()
})
wg.Wait()
})
}
}
}

Expand Down

0 comments on commit b806580

Please sign in to comment.