diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index 095d8df8fd8b..9af794ee5116 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -108,10 +108,10 @@ type RowSource interface { // Types returns the schema for the rows in this source. Types() []sqlbase.ColumnType - // Next returns the next record that a producer has pushed into this - // RowSource. At most one of the return values will be non-empty. Both of them - // can be empty when the RowSource has been exhausted - no more records are - // coming and any further method calls will be no-ops. + // Next returns the next record from the source. At most one of the return + // values will be non-empty. Both of them can be empty when the RowSource has + // been exhausted - no more records are coming and any further method calls + // will be no-ops. // // A ProducerMetadata record may contain an error. In that case, this // interface is oblivious about the semantics: implementers may continue @@ -126,8 +126,8 @@ type RowSource interface { // RowSource to drain, and separately discard any future data rows. Next() (sqlbase.EncDatumRow, ProducerMetadata) - // ConsumerDone lets the producer know that we will not need any more data - // rows. The producer is expected to start draining and only send metadata + // ConsumerDone lets the source know that we will not need any more data + // rows. The source is expected to start draining and only send metadata // rows. // // May block. If the consumer of the source stops consuming rows before @@ -136,9 +136,8 @@ type RowSource interface { // all the rows were consumed (i.e. after Next() returned an empty row). ConsumerDone() - // ConsumerClosed informs the producer that the consumer will not be reading - // any more rows. The producer is expected to shut down without sending - // anything else. + // ConsumerClosed informs the source that the consumer it is done and will + // not make any more calls to Next(). // // Like ConsumerDone(), if the consumer of the source stops consuming rows // before Next indicates that there are no more rows, ConsumerDone() and/or @@ -186,11 +185,16 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver } } -func sendTraceData(ctx context.Context, dst RowReceiver) { +func getTraceData(ctx context.Context) []tracing.RecordedSpan { if sp := opentracing.SpanFromContext(ctx); sp != nil { - if rec := tracing.GetRecording(sp); rec != nil { - dst.Push(nil /* row */, ProducerMetadata{TraceData: rec}) - } + return tracing.GetRecording(sp) + } + return nil +} + +func sendTraceData(ctx context.Context, dst RowReceiver) { + if rec := getTraceData(ctx); rec != nil { + dst.Push(nil /* row */, ProducerMetadata{TraceData: rec}) } } diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index 2c063d58903f..3f1c841e9b6b 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -52,6 +52,10 @@ type FlowID struct { type FlowCtx struct { log.AmbientContext + // Context used for all execution within the flow. + // Created in Start(), canceled in Cleanup(). + ctx context.Context + Settings *cluster.Settings stopper *stop.Stopper @@ -147,10 +151,6 @@ type Flow struct { status flowStatus - // Context used for all execution within the flow. - // Created in Start(), canceled in Cleanup(). - ctx context.Context - // Cancel function for ctx. Call this to cancel the flow (safe to be called // multiple times). ctxCancel context.CancelFunc diff --git a/pkg/sql/distsqlrun/tablereader.go b/pkg/sql/distsqlrun/tablereader.go index 9619677f226a..c11357f26cbe 100644 --- a/pkg/sql/distsqlrun/tablereader.go +++ b/pkg/sql/distsqlrun/tablereader.go @@ -19,6 +19,7 @@ import ( "context" "sync" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -53,6 +54,8 @@ var ScrubTypes = []sqlbase.ColumnType{ type tableReader struct { processorBase + ctx context.Context + span opentracing.Span flowCtx *FlowCtx tableDesc sqlbase.TableDescriptor @@ -62,6 +65,7 @@ type tableReader struct { fetcher sqlbase.MultiRowFetcher alloc sqlbase.DatumAlloc + started bool isCheck bool // fetcherResultToColIdx maps RowFetcher results to the column index in // the TableDescriptor. This is only initialized and used during scrub @@ -70,9 +74,14 @@ type tableReader struct { // indexIdx refers to the index being scanned. This is only used // during scrub physical checks. indexIdx int + + // consumerStatus is used by the RowSource interface to signal that the + // consumer is done accepting rows or is no longer accepting data. + consumerStatus ConsumerStatus } var _ Processor = &tableReader{} +var _ RowSource = &tableReader{} // newTableReader creates a tableReader. func newTableReader( @@ -186,96 +195,43 @@ func initRowFetcher( return index, isSecondaryIndex, nil } -// sendMisplannedRangesMetadata sends information about the non-local ranges -// that were read by this tableReader. This should be called after the fetcher -// was used to read everything this tableReader was supposed to read. -func (tr *tableReader) sendMisplannedRangesMetadata(ctx context.Context) { - misplannedRanges := misplannedRanges(ctx, tr.fetcher.GetRangeInfo(), tr.flowCtx.nodeID) - - if len(misplannedRanges) != 0 { - tr.out.output.Push(nil /* row */, ProducerMetadata{Ranges: misplannedRanges}) - } -} - // Run is part of the processor interface. func (tr *tableReader) Run(ctx context.Context, wg *sync.WaitGroup) { - if wg != nil { - defer wg.Done() + if tr.out.output == nil { + panic("output RowReceiver not initialized for emitting rows") } - - ctx = log.WithLogTagInt(ctx, "TableReader", int(tr.tableDesc.ID)) - ctx, span := processorSpan(ctx, "table reader") - defer tracing.FinishSpan(span) - - txn := tr.flowCtx.txn - if txn == nil { - log.Fatalf(ctx, "tableReader outside of txn") - } - - log.VEventf(ctx, 1, "starting") - if log.V(1) { - defer log.Infof(ctx, "exiting") + if ctx != tr.flowCtx.ctx { + panic("unexpected context") } - // TODO(radu,andrei,knz): set the traceKV flag when requested by the session. - if err := tr.fetcher.StartScan( - ctx, txn, tr.spans, true /* limit batches */, tr.limitHint, false, /* traceKV */ - ); err != nil { - log.Errorf(ctx, "scan error: %s", err) - tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err}) - tr.out.Close() - return + if wg != nil { + defer wg.Done() } for { - var row sqlbase.EncDatumRow - var err error - if !tr.isCheck { - row, _, _, err = tr.fetcher.NextRow(ctx) - } else { - // If we are running a scrub physical check, we use a specialized - // procedure that runs additional checks while fetching the row - // data. - row, err = tr.fetcher.NextRowWithErrors(ctx) - // There are four cases that can happen after NextRowWithErrors: - // 1) We encounter a ScrubError. We do not propagate the error up, - // but instead generate and emit a row for the final results. - // 2) No errors were found. We simply continue scanning the data - // and discard the row values, as they are not needed for any - // results. - // 3) A non-scrub error was encountered. This was not considered a - // physical data error, and so we propagate this to the user - // immediately. - // 4) There was no error or row data. This signals that there is - // no more data to scan. - // - // NB: Cases 3 and 4 are handled further below, in the standard - // table scanning code path. - if v, ok := err.(*scrub.Error); ok { - row, err = tr.generateScrubErrorRow(row, v) - } else if err == nil && row != nil { - continue + row, meta := tr.Next() + // Emit the row; stop if no more rows are needed. + if row != nil || !meta.Empty() { + status := tr.out.output.Push(row, meta) + if status != NeedMoreRows { + if status == ConsumerClosed { + tr.close() + } + break } } - if err != nil || row == nil { - if err != nil { - err = scrub.UnwrapScrubError(err) - tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err}) - } + if row == nil { break } - // Emit the row; stop if no more rows are needed. - consumerStatus, err := tr.out.EmitRow(ctx, row) - if err != nil || consumerStatus != NeedMoreRows { - if err != nil { - tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err}) - } - break + } + + if tr.consumerStatus != ConsumerClosed { + if meta := tr.producerMeta(nil); !meta.Empty() { + tr.out.output.Push(nil, meta) } } - tr.sendMisplannedRangesMetadata(ctx) - sendTraceData(ctx, tr.out.output) tr.out.Close() + tr.close() } // generateScrubErrorRow will create an EncDatumRow describing a @@ -347,3 +303,130 @@ func (tr *tableReader) prettyPrimaryKeyValues( primaryKeyValues.WriteByte(')') return primaryKeyValues.String() } + +// close the tableReader and finish any tracing. Any subsequent calls to Next() +// will return empty data. +func (tr *tableReader) close() { + if tr.ctx != nil { + if log.V(1) { + log.Infof(tr.ctx, "exiting") + } + tracing.FinishSpan(tr.span) + tr.ctx, tr.span = nil, nil + } + // This prevents Next() from returning more rows. + tr.out.rowIdx = tr.out.maxRowIdx +} + +// producerMeta constructs the ProducerMetadata after consumption of rows has +// terminated, either due to being indicated by the consumer, or because the +// tableReader ran out of rows or encountered an error. +func (tr *tableReader) producerMeta(err error) ProducerMetadata { + var meta ProducerMetadata + if tr.ctx != nil { + meta = ProducerMetadata{ + Err: err, + Ranges: misplannedRanges(tr.ctx, tr.fetcher.GetRangeInfo(), tr.flowCtx.nodeID), + TraceData: getTraceData(tr.ctx), + } + // We need to close as soon as we send producer metadata as we're done + // sending rows. The consumer is allowed to not call ConsumerDone(). + tr.close() + } + return meta +} + +func (tr *tableReader) Types() []sqlbase.ColumnType { + return tr.out.outputTypes +} + +func (tr *tableReader) Next() (sqlbase.EncDatumRow, ProducerMetadata) { + if !tr.started { + tr.started = true + + if tr.flowCtx.txn == nil { + log.Fatalf(tr.ctx, "tableReader outside of txn") + } + + if tr.ctx == nil { + tr.ctx = log.WithLogTagInt(tr.flowCtx.ctx, "TableReader", int(tr.tableDesc.ID)) + tr.ctx, tr.span = processorSpan(tr.ctx, "table reader") + log.VEventf(tr.ctx, 1, "starting") + } + + // TODO(radu,andrei,knz): set the traceKV flag when requested by the session. + if err := tr.fetcher.StartScan( + tr.ctx, tr.flowCtx.txn, tr.spans, + true /* limit batches */, tr.limitHint, false, /* traceKV */ + ); err != nil { + log.Errorf(tr.ctx, "scan error: %s", err) + return nil, tr.producerMeta(err) + } + } + + if tr.out.rowIdx == tr.out.maxRowIdx || tr.consumerStatus != NeedMoreRows { + return nil, tr.producerMeta(nil) + } + + for { + var row sqlbase.EncDatumRow + var err error + if !tr.isCheck { + row, _, _, err = tr.fetcher.NextRow(tr.ctx) + } else { + // If we are running a scrub physical check, we use a specialized + // procedure that runs additional checks while fetching the row + // data. + row, err = tr.fetcher.NextRowWithErrors(tr.ctx) + // There are four cases that can happen after NextRowWithErrors: + // 1) We encounter a ScrubError. We do not propagate the error up, + // but instead generate and emit a row for the final results. + // 2) No errors were found. We simply continue scanning the data + // and discard the row values, as they are not needed for any + // results. + // 3) A non-scrub error was encountered. This was not considered a + // physical data error, and so we propagate this to the user + // immediately. + // 4) There was no error or row data. This signals that there is + // no more data to scan. + // + // NB: Cases 3 and 4 are handled further below, in the standard + // table scanning code path. + if v, ok := err.(*scrub.Error); ok { + row, err = tr.generateScrubErrorRow(row, v) + } else if err == nil && row != nil { + continue + } + } + if row == nil || err != nil { + // This was the last-row or an error was encountered, annotate the + // metadata with misplanned ranges and trace data. + return nil, tr.producerMeta(scrub.UnwrapScrubError(err)) + } + + outRow, status, err := tr.out.ProcessRow(tr.ctx, row) + if outRow == nil && err == nil && status == NeedMoreRows { + continue + } + if err != nil { + return nil, tr.producerMeta(err) + } + return outRow, ProducerMetadata{} + } +} + +func (tr *tableReader) ConsumerDone() { + if tr.consumerStatus != NeedMoreRows { + log.Fatalf(context.Background(), "tableReader already done or closed: %d", tr.consumerStatus) + } + tr.consumerStatus = DrainRequested +} + +func (tr *tableReader) ConsumerClosed() { + if tr.consumerStatus == ConsumerClosed { + log.Fatalf(context.Background(), "tableReader already closed") + } + tr.consumerStatus = ConsumerClosed + // The consumer is done, Next() will not be called again. + tr.close() +} diff --git a/pkg/sql/distsqlrun/tablereader_test.go b/pkg/sql/distsqlrun/tablereader_test.go index 2f7bb675dac0..1b2e2a320b2e 100644 --- a/pkg/sql/distsqlrun/tablereader_test.go +++ b/pkg/sql/distsqlrun/tablereader_test.go @@ -16,6 +16,7 @@ package distsqlrun import ( "context" + "fmt" "sort" "testing" @@ -113,41 +114,57 @@ func TestTableReader(t *testing.T) { } for _, c := range testCases { - ts := c.spec - ts.Table = *td + t.Run("", func(t *testing.T) { + for _, rowSource := range []bool{false, true} { + t.Run(fmt.Sprintf("row-source=%t", rowSource), func(t *testing.T) { + ts := c.spec + ts.Table = *td - evalCtx := tree.MakeTestingEvalContext() - defer evalCtx.Stop(context.Background()) - flowCtx := FlowCtx{ - EvalCtx: evalCtx, - Settings: s.ClusterSettings(), - // Pass a DB without a TxnCoordSender. - txn: client.NewTxn(client.NewDB(s.DistSender(), s.Clock()), s.NodeID()), - nodeID: s.NodeID(), - } + evalCtx := tree.MakeTestingEvalContext() + defer evalCtx.Stop(context.Background()) + flowCtx := FlowCtx{ + ctx: context.Background(), + EvalCtx: evalCtx, + Settings: s.ClusterSettings(), + // Pass a DB without a TxnCoordSender. + txn: client.NewTxn(client.NewDB(s.DistSender(), s.Clock()), s.NodeID()), + nodeID: s.NodeID(), + } - out := &RowBuffer{} - tr, err := newTableReader(&flowCtx, &ts, &c.post, out) - if err != nil { - t.Fatal(err) - } - tr.Run(context.Background(), nil) - if !out.ProducerClosed { - t.Fatalf("output RowReceiver not closed") - } + buf := &RowBuffer{} + tr, err := newTableReader(&flowCtx, &ts, &c.post, buf) + if err != nil { + t.Fatal(err) + } - var res sqlbase.EncDatumRows - for { - row := out.NextNoMeta(t) - if row == nil { - break - } - res = append(res, row) - } + var out RowSource + if rowSource { + out = tr + } else { + tr.Run(context.Background(), nil) + if !buf.ProducerClosed { + t.Fatalf("output RowReceiver not closed") + } + out = buf + } - if result := res.String(tr.OutputTypes()); result != c.expected { - t.Errorf("invalid results: %s, expected %s'", result, c.expected) - } + var res sqlbase.EncDatumRows + for { + row, meta := out.Next() + if !meta.Empty() { + t.Fatalf("unexpected metadata: %+v", meta) + } + if row == nil { + break + } + res = append(res, row) + } + if result := res.String(tr.OutputTypes()); result != c.expected { + t.Errorf("invalid results: %s, expected %s'", result, c.expected) + } + }) + } + }) } } @@ -185,6 +202,7 @@ ALTER TABLE t TESTING_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[3], 3 defer evalCtx.Stop(context.Background()) nodeID := tc.Server(0).NodeID() flowCtx := FlowCtx{ + ctx: context.Background(), EvalCtx: evalCtx, Settings: tc.Server(0).ClusterSettings(), // Pass a DB without a TxnCoordSender. @@ -200,47 +218,60 @@ ALTER TABLE t TESTING_RELOCATE VALUES (ARRAY[2], 1), (ARRAY[1], 2), (ARRAY[3], 3 OutputColumns: []uint32{0}, } - out := &RowBuffer{} - tr, err := newTableReader(&flowCtx, &spec, &post, out) - if err != nil { - t.Fatal(err) - } - tr.Run(context.TODO(), nil) - if !out.ProducerClosed { - t.Fatalf("output RowReceiver not closed") - } - var res sqlbase.EncDatumRows - var metas []ProducerMetadata - for { - row, meta := out.Next() - if !meta.Empty() { - metas = append(metas, meta) - continue - } - if row == nil { - break - } - res = append(res, row) - } - if len(res) != 3 { - t.Fatalf("expected 3 rows, got: %d", len(res)) - } - if len(metas) != 1 { - t.Fatalf("expected one meta with misplanned ranges, got: %+v", metas) - } - misplannedRanges := metas[0].Ranges - if len(misplannedRanges) != 2 { - t.Fatalf("expected 2 misplanned ranges, got: %+v", misplannedRanges) - } - // The metadata about misplanned ranges can come in any order (it depends on - // the order in which parallel sub-batches complete after having been split by - // DistSender). - sort.Slice(misplannedRanges, func(i, j int) bool { - return misplannedRanges[i].Lease.Replica.NodeID < misplannedRanges[j].Lease.Replica.NodeID - }) - if misplannedRanges[0].Lease.Replica.NodeID != 2 || - misplannedRanges[1].Lease.Replica.NodeID != 3 { - t.Fatalf("expected misplanned ranges from nodes 2 and 3, got: %+v", metas[0]) + for _, rowSource := range []bool{false, true} { + t.Run(fmt.Sprintf("row-source=%t", rowSource), func(t *testing.T) { + buf := &RowBuffer{} + tr, err := newTableReader(&flowCtx, &spec, &post, buf) + if err != nil { + t.Fatal(err) + } + + var out RowSource + if rowSource { + out = tr + } else { + tr.Run(context.Background(), nil) + if !buf.ProducerClosed { + t.Fatalf("output RowReceiver not closed") + } + out = buf + } + + var res sqlbase.EncDatumRows + var metas []ProducerMetadata + for { + row, meta := out.Next() + if !meta.Empty() { + metas = append(metas, meta) + continue + } + if row == nil { + break + } + res = append(res, row) + } + + if len(res) != 3 { + t.Fatalf("expected 3 rows, got: %d", len(res)) + } + if len(metas) != 1 { + t.Fatalf("expected one meta with misplanned ranges, got: %+v", metas) + } + misplannedRanges := metas[0].Ranges + if len(misplannedRanges) != 2 { + t.Fatalf("expected 2 misplanned ranges, got: %+v", misplannedRanges) + } + // The metadata about misplanned ranges can come in any order (it depends on + // the order in which parallel sub-batches complete after having been split by + // DistSender). + sort.Slice(misplannedRanges, func(i, j int) bool { + return misplannedRanges[i].Lease.Replica.NodeID < misplannedRanges[j].Lease.Replica.NodeID + }) + if misplannedRanges[0].Lease.Replica.NodeID != 2 || + misplannedRanges[1].Lease.Replica.NodeID != 3 { + t.Fatalf("expected misplanned ranges from nodes 2 and 3, got: %+v", metas[0]) + } + }) } } @@ -260,6 +291,7 @@ func BenchmarkTableReader(b *testing.B) { evalCtx := tree.MakeTestingEvalContext() defer evalCtx.Stop(context.Background()) flowCtx := FlowCtx{ + ctx: context.Background(), EvalCtx: evalCtx, Settings: s.ClusterSettings(), // Pass a DB without a TxnCoordSender. @@ -273,18 +305,12 @@ func BenchmarkTableReader(b *testing.B) { post := PostProcessSpec{} b.ResetTimer() for i := 0; i < b.N; i++ { - out := &RowBuffer{} - tr, err := newTableReader(&flowCtx, &spec, &post, out) + tr, err := newTableReader(&flowCtx, &spec, &post, nil) if err != nil { b.Fatal(err) } - tr.Run(context.Background(), nil) - if !out.ProducerClosed { - b.Fatalf("output RowReceiver not closed") - } - for { - row, meta := out.Next() + row, meta := tr.Next() if !meta.Empty() { b.Fatalf("unexpected metadata: %+v", meta) }