diff --git a/pkg/sql/rowflow/routers.go b/pkg/sql/rowflow/routers.go index 051013db7456..0cffea06bd61 100644 --- a/pkg/sql/rowflow/routers.go +++ b/pkg/sql/rowflow/routers.go @@ -404,32 +404,63 @@ func (rb *routerBase) updateStreamState( } } -// fwdMetadata forwards a metadata record to the first stream that's still -// accepting data. +// fwdMetadata forwards a metadata record to streams that are still accepting +// data. Note that if the metadata record contains an error, it is propagated +// to all non-closed streams whereas all other types of metadata are propagated +// only to the first non-closed stream. func (rb *routerBase) fwdMetadata(meta *execinfrapb.ProducerMetadata) { if meta == nil { log.Fatalf(context.TODO(), "asked to fwd empty metadata") } rb.semaphore <- struct{}{} + defer func() { + <-rb.semaphore + }() + if metaErr := meta.Err; metaErr != nil { + // Forward the error to all non-closed streams. + if rb.fwdErrMetadata(metaErr) { + return + } + } else { + // Forward the metadata to the first non-closed stream. + for i := range rb.outputs { + ro := &rb.outputs[i] + ro.mu.Lock() + if ro.mu.streamStatus != execinfra.ConsumerClosed { + ro.addMetadataLocked(meta) + ro.mu.Unlock() + ro.mu.cond.Signal() + return + } + ro.mu.Unlock() + } + } + // If we got here it means that we couldn't even forward metadata anywhere; + // all streams are closed. + atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed)) +} +// fwdErrMetadata forwards err to all non-closed streams and returns a boolean +// indicating whether it was sent on at least one stream. Note that this method +// assumes that rb.semaphore has been acquired and leaves it up to the caller +// to release it. +func (rb *routerBase) fwdErrMetadata(err error) bool { + forwarded := false for i := range rb.outputs { ro := &rb.outputs[i] ro.mu.Lock() if ro.mu.streamStatus != execinfra.ConsumerClosed { + meta := &execinfrapb.ProducerMetadata{Err: err} ro.addMetadataLocked(meta) ro.mu.Unlock() ro.mu.cond.Signal() - <-rb.semaphore - return + forwarded = true + } else { + ro.mu.Unlock() } - ro.mu.Unlock() } - - <-rb.semaphore - // If we got here it means that we couldn't even forward metadata anywhere; - // all streams are closed. - atomic.StoreUint32(&rb.aggregatedStatus, uint32(execinfra.ConsumerClosed)) + return forwarded } func (rb *routerBase) shouldUseSemaphore() bool { @@ -490,7 +521,8 @@ func (mr *mirrorRouter) Push( aggStatus := mr.aggStatus() if meta != nil { mr.fwdMetadata(meta) - return aggStatus + // fwdMetadata can change the status, re-read it. + return mr.aggStatus() } if aggStatus != execinfra.NeedMoreRows { return aggStatus diff --git a/pkg/sql/rowflow/routers_test.go b/pkg/sql/rowflow/routers_test.go index 745b97289b93..2e1a6ce1ebcb 100644 --- a/pkg/sql/rowflow/routers_test.go +++ b/pkg/sql/rowflow/routers_test.go @@ -16,6 +16,7 @@ import ( "fmt" "math" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -442,9 +443,9 @@ func preimageAttack( } } -// Test that metadata records get forwarded by routers. Regardless of the type -// of router, the records are supposed to be forwarded on the first output -// stream that's not closed. +// Test that metadata records get forwarded by routers. Depending on the type +// of the metadata, it might need to be forward to either one or all non-closed +// streams (regardless of the type of the router). func TestMetadataIsForwarded(t *testing.T) { defer leaktest.AfterTest(t)() @@ -473,89 +474,144 @@ func TestMetadataIsForwarded(t *testing.T) { }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - chans := make([]execinfra.RowChannel, 2) - recvs := make([]execinfra.RowReceiver, 2) - tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2) - for i := 0; i < 2; i++ { - chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1) - recvs[i] = &chans[i] - tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)} - } - router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) - - err1 := errors.Errorf("test error 1") - err2 := errors.Errorf("test error 2") - err3 := errors.Errorf("test error 3") - err4 := errors.Errorf("test error 4") - - // Push metadata; it should go to stream 0. - for i := 0; i < 10; i++ { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err1}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + // metaConfigs describe different configuration of metadata handling. It + // assumes the test is working in 4 stages with the following events: + // - stage 1 is finished with ConsumerDone() call on stream 0 + // - stage 2 is finished with ConsumerClosed() call on stream 0 (at this + // point only stream 1 is non-closed) + // - stage 3 is finished with ConsumerClosed() call on stream 1 (at this + // point all streams are closed). + metaConfigs := []struct { + name string + getMeta func(stage int) *execinfrapb.ProducerMetadata + // getReceiverStreamIDs returns the streamIDs of streams that are + // expected to receive the metadata on the given stage. + getReceiverStreamIDs func(stage int) []int + assertExpected func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) + }{ + { + name: "error", + getMeta: func(stage int) *execinfrapb.ProducerMetadata { + return &execinfrapb.ProducerMetadata{ + Err: errors.Errorf("test error %d", stage), } - _, meta := chans[0].Next() - if meta.Err != err1 { - t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err1) + }, + getReceiverStreamIDs: func(stage int) []int { + switch stage { + case 1, 2: + // Errors are propagated to all non-closed streams. + return []int{0, 1} + default: + // Stream 0 is closed after stage 2, so now only stream 1 + // is expected to receive metadata. + return []int{1} } - } - - chans[0].ConsumerDone() - // Push metadata; it should still go to stream 0. - for i := 0; i < 10; i++ { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err2}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + }, + assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) { + expected := fmt.Sprintf("test error %d", stage) + if !strings.Contains(meta.Err.Error(), expected) { + t.Fatalf("stream %d: unexpected meta.Err %v, expected %s", streamID, meta.Err, expected) } - _, meta := chans[0].Next() - if meta.Err != err2 { - t.Fatalf("unexpected meta.Err %v, expected %s", meta.Err, err2) + }, + }, + { + name: "non-error", + getMeta: func(stage int) *execinfrapb.ProducerMetadata { + return &execinfrapb.ProducerMetadata{ + RowNum: &execinfrapb.RemoteProducerMetadata_RowNum{RowNum: int32(stage)}, } - } - - chans[0].ConsumerClosed() + }, + getReceiverStreamIDs: func(stage int) []int { + switch stage { + case 1, 2: + return []int{0} + default: + return []int{1} + } + }, + assertExpected: func(streamID int, meta *execinfrapb.ProducerMetadata, stage int) { + if meta.RowNum.RowNum != int32(stage) { + t.Fatalf("streamID %d: unexpected meta %v, expected RowNum=%d in stage %d", streamID, meta, stage, stage) + } + }, + }, + } - // Metadata should switch to going to stream 1 once the new status is - // observed. - testutils.SucceedsSoon(t, func() error { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err3}) - if consumerStatus != execinfra.NeedMoreRows { - t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + for _, tc := range testCases { + for _, metaConfig := range metaConfigs { + t.Run(fmt.Sprintf("%s/%s", tc.name, metaConfig.name), func(t *testing.T) { + chans := make([]execinfra.RowChannel, 2) + recvs := make([]execinfra.RowReceiver, 2) + tc.spec.Streams = make([]execinfrapb.StreamEndpointSpec, 2) + for i := 0; i < 2; i++ { + chans[i].InitWithBufSizeAndNumSenders(nil /* no column types */, 1, 1) + recvs[i] = &chans[i] + tc.spec.Streams[i] = execinfrapb.StreamEndpointSpec{StreamID: execinfrapb.StreamID(i)} } - // Receive on stream 1 if there is a message waiting. Metadata may still - // try to go to 0 for a little while. - select { - case d := <-chans[1].C: - if d.Meta.Err != err3 { - t.Fatalf("unexpected meta.Err %v, expected %s", d.Meta.Err, err3) + router, wg := setupRouter(t, st, evalCtx, diskMonitor, tc.spec, nil /* no columns */, recvs) + + stage := 1 + for i := 0; i < 10; i++ { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + for _, streamID := range metaConfig.getReceiverStreamIDs(stage) { + _, meta := chans[streamID].Next() + metaConfig.assertExpected(streamID, meta, stage) } - return nil - default: - return errors.Errorf("no metadata on stream 1") } - }) + chans[0].ConsumerDone() - chans[1].ConsumerClosed() + stage = 2 + for i := 0; i < 10; i++ { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + for _, streamID := range metaConfig.getReceiverStreamIDs(stage) { + _, meta := chans[streamID].Next() + metaConfig.assertExpected(streamID, meta, stage) + } + } + chans[0].ConsumerClosed() - // Start drain the channels in the background. - for i := range chans { - go drainRowChannel(&chans[i]) - } + stage = 3 + testutils.SucceedsSoon(t, func() error { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.NeedMoreRows { + t.Fatalf("expected status %d, got: %d", execinfra.NeedMoreRows, consumerStatus) + } + // Receive on stream 1 if there is a message waiting. Metadata may still + // try to go to 0 for a little while. + select { + case d := <-chans[1].C: + metaConfig.assertExpected(1 /* streamID */, d.Meta, stage) + return nil + default: + return errors.Errorf("no metadata on stream 1") + } + }) + chans[1].ConsumerClosed() - testutils.SucceedsSoon(t, func() error { - consumerStatus := router.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: err4}) - if consumerStatus != execinfra.ConsumerClosed { - return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus) + stage = 4 + // Start drain the channels in the background. + for i := range chans { + go drainRowChannel(&chans[i]) } - return nil - }) + testutils.SucceedsSoon(t, func() error { + consumerStatus := router.Push(nil /* row */, metaConfig.getMeta(stage)) + if consumerStatus != execinfra.ConsumerClosed { + return fmt.Errorf("expected status %d, got: %d", execinfra.ConsumerClosed, consumerStatus) + } + return nil + }) - router.ProducerDone() + router.ProducerDone() - wg.Wait() - }) + wg.Wait() + }) + } } }