From fae195c984fa810a60f6232b03e31d240ef32c17 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 16 Jul 2020 11:19:25 -0700 Subject: [PATCH] rowflow: make routers propagate errors to all non-closed outputs This commit changes the way we propagate the errors in the hash router so that the error metadata is sent on all non-closed streams. Previously, we would be sending it over only the first non-closed stream which could result in the processors on the same stage as that single stream end to treat the absence of rows and errors as the input being exhausted successfully, which is wrong because the input did encounter an error. Note that unlike in the "original" commit of this backport, we don't need to make any changes in the vectorized engine because `colexec.HashRouter` buffers all errors during its run and returns all of them as metadata. Release note (bug fix): Previously, CockroachDB could return incorrect results on query that encountered ReadWithinUncertaintyInterval error, and this has been fixed. --- pkg/sql/rowflow/routers.go | 54 +++++++-- pkg/sql/rowflow/routers_test.go | 200 ++++++++++++++++++++------------ 2 files changed, 171 insertions(+), 83 deletions(-) diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 051013db7456..0cffea06bd61 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -404,32 +404,63 @@ func (rb *routerBase) updateStreamState( } } -// fwdMetadata forwards a metadata record to the first stream that's still -// accepting data. +// fwdMetadata forwards a metadata record to streams that are still accepting +// data. Note that if the metadata record contains an error, it is propagated +// to all non-closed streams whereas all other types of metadata are propagated +// only to the first non-closed stream. func (rb *routerBase) fwdMetadata(meta *execinfrapb.ProducerMetadata) { if meta == nil { log.Fatalf(context.TODO(), "asked to fwd empty metadata") } rb.semaphore <- struct{}{} + defer func() { + <-rb.semaphore + }() + if metaErr := meta.Err; metaErr != nil { + // Forward the error to all non-closed streams. + if rb.fwdErrMetadata(metaErr) { + return + } + } else { + // Forward the metadata to the first non-closed stream. + for i := range rb.outputs { + ro := &rb.outputs[i] + ro.mu.Lock() + if ro.mu.streamStatus != execinfra.ConsumerClosed { + ro.addMetadataLocked(meta) + ro.mu.Unlock() + ro.mu.cond.Signal() + return + } + ro.mu.Unlock() + } + } + // If we got here it means that we couldn't even forward metadata anywhere; + // all streams are closed. + atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed)) +} +// fwdErrMetadata forwards err to all non-closed streams and returns a boolean +// indicating whether it was sent on at least one stream. Note that this method +// assumes that rb.semaphore has been acquired and leaves it up to the caller +// to release it. +func (rb *routerBase) fwdErrMetadata(err error) bool { + forwarded := false for i := range rb.outputs { ro := &rb.outputs[i] ro.mu.Lock() if ro.mu.streamStatus != execinfra.ConsumerClosed { + meta := &execinfrapb.ProducerMetadata{Err: err} ro.addMetadataLocked(meta) ro.mu.Unlock() ro.mu.cond.Signal() - <-rb.semaphore - return + forwarded = true + } else { + ro.mu.Unlock() } - ro.mu.Unlock() } - - <-rb.semaphore - // If we got here it means that we couldn't even forward metadata anywhere; - // all streams are closed. - atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed)) + return forwarded } func (rb *routerBase) shouldUseSemaphore() bool { @@ -490,7 +521,8 @@ func (mr *mirrorRouter) Push( aggStatus := mr.aggStatus() if meta != nil { mr.fwdMetadata(meta) - return aggStatus + // fwdMetadata can change the status, re-read it. + return mr.aggStatus() } if aggStatus != execinfra.NeedMoreRows { return aggStatus diff --git a/pkg/sql/rowflow/routers_test.go b/pkg/sql/rowflow/routers_test.go index 745b97289b93..2e1a6ce1ebcb 100644 --- a/pkg/sql/rowflow/routers_test.go +++ b/pkg/sql/rowflow/routers_test.go @@ -16,6 +16,7 @@ import ( "fmt" "math" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -442,9 +443,9 @@ func preimageAttack( } } -// Test that metadata records get forwarded by routers. Regardless of the type -// of router, the records are supposed to be forwarded on the first output -// stream that's not closed. +// Test that metadata records get forwarded by routers. Depending on the type +// of the metadata, it might need to be forward to either one or all non-closed +// streams (regardless of the type of the router). func TestMetadataIsForwarded(t *testing.T) { defer leaktest.AfterTest(t)() @@ -473,89 +474,144 @@ func TestMetadataIsForwarded(t *testing.T) { }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - chans := make([]execinfra.RowChannel, 2) - recvs := make([]execinfra.RowReceiver, 2) - tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2) - for i := 0; i < 2; i++ { - chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1) - recvs[i] = &chans[i] - tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)} - } - router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) - - err1 := errors.Errorf("test error 1") - err2 := errors.Errorf("test error 2") - err3 := errors.Errorf("test error 3") - err4 := errors.Errorf("test error 4") - - // Push metadata; it should go to stream 0. - for i := 0; i < 10; i++ { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err1}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + // metaConfigs describe different configuration of metadata handling. It + // assumes the test is working in 4 stages with the following events: + // - stage 1 is finished with ConsumerDone() call on stream 0 + // - stage 2 is finished with ConsumerClosed() call on stream 0 (at this + // point only stream 1 is non-closed) + // - stage 3 is finished with ConsumerClosed() call on stream 1 (at this + // point all streams are closed). + metaConfigs := []struct { + name string + getMeta func(stage int) *execinfrapb.ProducerMetadata + // getReceiverStreamIDs returns the streamIDs of streams that are + // expected to receive the metadata on the given stage. + getReceiverStreamIDs func(stage int) []int + assertExpected func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) + }{ + { + name: "error", + getMeta: func(stage int) *execinfrapb.ProducerMetadata { + return &execinfrapb.ProducerMetadata{ + Err: errors.Errorf("test error %d", stage), } - _, meta := chans[0].Next() - if meta.Err != err1 { - t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err1) + }, + getReceiverStreamIDs: func(stage int) []int { + switch stage { + case 1, 2: + // Errors are propagated to all non-closed streams. + return []int{0, 1} + default: + // Stream 0 is closed after stage 2, so now only stream 1 + // is expected to receive metadata. + return []int{1} } - } - - chans[0].ConsumerDone() - // Push metadata; it should still go to stream 0. - for i := 0; i < 10; i++ { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err2}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + }, + assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) { + expected := fmt.Sprintf("test error %d", stage) + if !strings.Contains(meta.Err.Error(), expected) { + t.Fatalf("stream %d: unexpected meta.Err %v, expected %s", streamID, meta.Err, expected) } - _, meta := chans[0].Next() - if meta.Err != err2 { - t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err2) + }, + }, + { + name: "non-error", + getMeta: func(stage int) *execinfrapb.ProducerMetadata { + return &execinfrapb.ProducerMetadata{ + RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{RowNum: int32(stage)}, } - } - - chans[0].ConsumerClosed() + }, + getReceiverStreamIDs: func(stage int) []int { + switch stage { + case 1, 2: + return []int{0} + default: + return []int{1} + } + }, + assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) { + if meta.RowNum.RowNum != int32(stage) { + t.Fatalf("streamID %d: unexpected meta %v, expected RowNum=%d in stage %d", streamID, meta, stage, stage) + } + }, + }, + } - // Metadata should switch to going to stream 1 once the new status is - // observed. - testutils.SucceedsSoon(t, func() error { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err3}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + for _, tc := range testCases { + for _, metaConfig := range metaConfigs { + t.Run(fmt.Sprintf("%s/%s", tc.name, metaConfig.name), func(t *testing.T) { + chans := make([]execinfra.RowChannel, 2) + recvs := make([]execinfra.RowReceiver, 2) + tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2) + for i := 0; i < 2; i++ { + chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1) + recvs[i] = &chans[i] + tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)} } - // Receive on stream 1 if there is a message waiting. Metadata may still - // try to go to 0 for a little while. - select { - case d := <-chans[1].C: - if d.Meta.Err != err3 { - t.Fatalf("unexpected meta.Err %v, expected %s", d.Meta.Err, err3) + router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) + + stage := 1 + for i := 0; i < 10; i++ { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + for _, streamID := range metaConfig.getReceiverStreamIDs(stage) { + _, meta := chans[streamID].Next() + metaConfig.assertExpected(streamID, meta, stage) } - return nil - default: - return errors.Errorf("no metadata on stream 1") } - }) + chans[0].ConsumerDone() - chans[1].ConsumerClosed() + stage = 2 + for i := 0; i < 10; i++ { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + for _, streamID := range metaConfig.getReceiverStreamIDs(stage) { + _, meta := chans[streamID].Next() + metaConfig.assertExpected(streamID, meta, stage) + } + } + chans[0].ConsumerClosed() - // Start drain the channels in the background. - for i := range chans { - go drainRowChannel(&chans[i]) - } + stage = 3 + testutils.SucceedsSoon(t, func() error { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + // Receive on stream 1 if there is a message waiting. Metadata may still + // try to go to 0 for a little while. + select { + case d := <-chans[1].C: + metaConfig.assertExpected(1 /* streamID */, d.Meta, stage) + return nil + default: + return errors.Errorf("no metadata on stream 1") + } + }) + chans[1].ConsumerClosed() - testutils.SucceedsSoon(t, func() error { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err4}) - if consumerStatus != execinfra.ConsumerClosed { - return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus) + stage = 4 + // Start drain the channels in the background. + for i := range chans { + go drainRowChannel(&chans[i]) } - return nil - }) + testutils.SucceedsSoon(t, func() error { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.ConsumerClosed { + return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus) + } + return nil + }) - router.ProducerDone() + router.ProducerDone() - wg.Wait() - }) + wg.Wait() + }) + } } }