Skip to content

Commit

Permalink
sql/distsqlrun: refactor tableReader to implement RowSource
Browse files Browse the repository at this point in the history
Refactor tableReader to implement the RowSource interface. Refactor
tableReader.Run() to be implemented in terms of
tableReader.Next() (i.e. the RowSource interface).

Adjusted BenchmarkTableReader to avoid using a RowBuffer. This shows the
benefit that can be achieved by using TableReader as a RowSource ("old"
below is with the benchmark modified to use a RowChannel).

name           old time/op  new time/op  delta
TableReader-8  11.6ms ± 5%   9.4ms ± 3%  -18.81%  (p=0.000 n=10+10)

See cockroachdb#20550

Release note: None
  • Loading branch information
petermattis committed Dec 28, 2017
1 parent 234e945 commit d4b6e3f
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 172 deletions.
30 changes: 17 additions & 13 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ type RowSource interface {
// Types returns the schema for the rows in this source.
Types() []sqlbase.ColumnType

// Next returns the next record that a producer has pushed into this
// RowSource. At most one of the return values will be non-empty. Both of them
// can be empty when the RowSource has been exhausted - no more records are
// coming and any further method calls will be no-ops.
// Next returns the next record from the source. At most one of the return
// values will be non-empty. Both of them can be empty when the RowSource has
// been exhausted - no more records are coming and any further method calls
// will be no-ops.
//
// A ProducerMetadata record may contain an error. In that case, this
// interface is oblivious about the semantics: implementers may continue
Expand All @@ -126,8 +126,8 @@ type RowSource interface {
// RowSource to drain, and separately discard any future data rows.
Next() (sqlbase.EncDatumRow, ProducerMetadata)

// ConsumerDone lets the producer know that we will not need any more data
// rows. The producer is expected to start draining and only send metadata
// ConsumerDone lets the source know that we will not need any more data
// rows. The source is expected to start draining and only send metadata
// rows.
//
// May block. If the consumer of the source stops consuming rows before
Expand All @@ -136,9 +136,8 @@ type RowSource interface {
// all the rows were consumed (i.e. after Next() returned an empty row).
ConsumerDone()

// ConsumerClosed informs the producer that the consumer will not be reading
// any more rows. The producer is expected to shut down without sending
// anything else.
// ConsumerClosed informs the source that the consumer it is done and will
// not make any more calls to Next().
//
// Like ConsumerDone(), if the consumer of the source stops consuming rows
// before Next indicates that there are no more rows, ConsumerDone() and/or
Expand Down Expand Up @@ -186,11 +185,16 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver
}
}

func sendTraceData(ctx context.Context, dst RowReceiver) {
func getTraceData(ctx context.Context) []tracing.RecordedSpan {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
if rec := tracing.GetRecording(sp); rec != nil {
dst.Push(nil /* row */, ProducerMetadata{TraceData: rec})
}
return tracing.GetRecording(sp)
}
return nil
}

func sendTraceData(ctx context.Context, dst RowReceiver) {
if rec := getTraceData(ctx); rec != nil {
dst.Push(nil /* row */, ProducerMetadata{TraceData: rec})
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type FlowID struct {
type FlowCtx struct {
log.AmbientContext

// Context used for all execution within the flow.
// Created in Start(), canceled in Cleanup().
ctx context.Context

Settings *cluster.Settings

stopper *stop.Stopper
Expand Down Expand Up @@ -147,10 +151,6 @@ type Flow struct {

status flowStatus

// Context used for all execution within the flow.
// Created in Start(), canceled in Cleanup().
ctx context.Context

// Cancel function for ctx. Call this to cancel the flow (safe to be called
// multiple times).
ctxCancel context.CancelFunc
Expand Down
233 changes: 158 additions & 75 deletions pkg/sql/distsqlrun/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"sync"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -53,6 +54,8 @@ var ScrubTypes = []sqlbase.ColumnType{
type tableReader struct {
processorBase

ctx context.Context
span opentracing.Span
flowCtx *FlowCtx

tableDesc sqlbase.TableDescriptor
Expand All @@ -62,6 +65,7 @@ type tableReader struct {
fetcher sqlbase.MultiRowFetcher
alloc sqlbase.DatumAlloc

started bool
isCheck bool
// fetcherResultToColIdx maps RowFetcher results to the column index in
// the TableDescriptor. This is only initialized and used during scrub
Expand All @@ -70,9 +74,14 @@ type tableReader struct {
// indexIdx refers to the index being scanned. This is only used
// during scrub physical checks.
indexIdx int

// consumerStatus is used by the RowSource interface to signal that the
// consumer is done accepting rows or is no longer accepting data.
consumerStatus ConsumerStatus
}

var _ Processor = &tableReader{}
var _ RowSource = &tableReader{}

// newTableReader creates a tableReader.
func newTableReader(
Expand Down Expand Up @@ -186,96 +195,43 @@ func initRowFetcher(
return index, isSecondaryIndex, nil
}

// sendMisplannedRangesMetadata sends information about the non-local ranges
// that were read by this tableReader. This should be called after the fetcher
// was used to read everything this tableReader was supposed to read.
func (tr *tableReader) sendMisplannedRangesMetadata(ctx context.Context) {
misplannedRanges := misplannedRanges(ctx, tr.fetcher.GetRangeInfo(), tr.flowCtx.nodeID)

if len(misplannedRanges) != 0 {
tr.out.output.Push(nil /* row */, ProducerMetadata{Ranges: misplannedRanges})
}
}

// Run is part of the processor interface.
func (tr *tableReader) Run(ctx context.Context, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
if tr.out.output == nil {
panic("output RowReceiver not initialized for emitting rows")
}

ctx = log.WithLogTagInt(ctx, "TableReader", int(tr.tableDesc.ID))
ctx, span := processorSpan(ctx, "table reader")
defer tracing.FinishSpan(span)

txn := tr.flowCtx.txn
if txn == nil {
log.Fatalf(ctx, "tableReader outside of txn")
}

log.VEventf(ctx, 1, "starting")
if log.V(1) {
defer log.Infof(ctx, "exiting")
if ctx != tr.flowCtx.ctx {
panic("unexpected context")
}

// TODO(radu,andrei,knz): set the traceKV flag when requested by the session.
if err := tr.fetcher.StartScan(
ctx, txn, tr.spans, true /* limit batches */, tr.limitHint, false, /* traceKV */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err})
tr.out.Close()
return
if wg != nil {
defer wg.Done()
}

for {
var row sqlbase.EncDatumRow
var err error
if !tr.isCheck {
row, _, _, err = tr.fetcher.NextRow(ctx)
} else {
// If we are running a scrub physical check, we use a specialized
// procedure that runs additional checks while fetching the row
// data.
row, err = tr.fetcher.NextRowWithErrors(ctx)
// There are four cases that can happen after NextRowWithErrors:
// 1) We encounter a ScrubError. We do not propagate the error up,
// but instead generate and emit a row for the final results.
// 2) No errors were found. We simply continue scanning the data
// and discard the row values, as they are not needed for any
// results.
// 3) A non-scrub error was encountered. This was not considered a
// physical data error, and so we propagate this to the user
// immediately.
// 4) There was no error or row data. This signals that there is
// no more data to scan.
//
// NB: Cases 3 and 4 are handled further below, in the standard
// table scanning code path.
if v, ok := err.(*scrub.Error); ok {
row, err = tr.generateScrubErrorRow(row, v)
} else if err == nil && row != nil {
continue
row, meta := tr.Next()
// Emit the row; stop if no more rows are needed.
if row != nil || !meta.Empty() {
status := tr.out.output.Push(row, meta)
if status != NeedMoreRows {
if status == ConsumerClosed {
tr.close()
}
break
}
}
if err != nil || row == nil {
if err != nil {
err = scrub.UnwrapScrubError(err)
tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err})
}
if row == nil {
break
}
// Emit the row; stop if no more rows are needed.
consumerStatus, err := tr.out.EmitRow(ctx, row)
if err != nil || consumerStatus != NeedMoreRows {
if err != nil {
tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err})
}
break
}

if tr.consumerStatus != ConsumerClosed {
if meta := tr.producerMeta(nil); !meta.Empty() {
tr.out.output.Push(nil, meta)
}
}
tr.sendMisplannedRangesMetadata(ctx)
sendTraceData(ctx, tr.out.output)
tr.out.Close()
tr.close()
}

// generateScrubErrorRow will create an EncDatumRow describing a
Expand Down Expand Up @@ -347,3 +303,130 @@ func (tr *tableReader) prettyPrimaryKeyValues(
primaryKeyValues.WriteByte(')')
return primaryKeyValues.String()
}

// close the tableReader and finish any tracing. Any subsequent calls to Next()
// will return empty data.
func (tr *tableReader) close() {
if tr.ctx != nil {
if log.V(1) {
log.Infof(tr.ctx, "exiting")
}
tracing.FinishSpan(tr.span)
tr.ctx, tr.span = nil, nil
}
// This prevents Next() from returning more rows.
tr.out.rowIdx = tr.out.maxRowIdx
}

// producerMeta constructs the ProducerMetadata after consumption of rows has
// terminated, either due to being indicated by the consumer, or because the
// tableReader ran out of rows or encountered an error.
func (tr *tableReader) producerMeta(err error) ProducerMetadata {
var meta ProducerMetadata
if tr.ctx != nil {
meta = ProducerMetadata{
Err: err,
Ranges: misplannedRanges(tr.ctx, tr.fetcher.GetRangeInfo(), tr.flowCtx.nodeID),
TraceData: getTraceData(tr.ctx),
}
// We need to close as soon as we send producer metadata as we're done
// sending rows. The consumer is allowed to not call ConsumerDone().
tr.close()
}
return meta
}

func (tr *tableReader) Types() []sqlbase.ColumnType {
return tr.out.outputTypes
}

func (tr *tableReader) Next() (sqlbase.EncDatumRow, ProducerMetadata) {
if !tr.started {
tr.started = true

if tr.flowCtx.txn == nil {
log.Fatalf(tr.ctx, "tableReader outside of txn")
}

if tr.ctx == nil {
tr.ctx = log.WithLogTagInt(tr.flowCtx.ctx, "TableReader", int(tr.tableDesc.ID))
tr.ctx, tr.span = processorSpan(tr.ctx, "table reader")
log.VEventf(tr.ctx, 1, "starting")
}

// TODO(radu,andrei,knz): set the traceKV flag when requested by the session.
if err := tr.fetcher.StartScan(
tr.ctx, tr.flowCtx.txn, tr.spans,
true /* limit batches */, tr.limitHint, false, /* traceKV */
); err != nil {
log.Errorf(tr.ctx, "scan error: %s", err)
return nil, tr.producerMeta(err)
}
}

if tr.out.rowIdx == tr.out.maxRowIdx || tr.consumerStatus != NeedMoreRows {
return nil, tr.producerMeta(nil)
}

for {
var row sqlbase.EncDatumRow
var err error
if !tr.isCheck {
row, _, _, err = tr.fetcher.NextRow(tr.ctx)
} else {
// If we are running a scrub physical check, we use a specialized
// procedure that runs additional checks while fetching the row
// data.
row, err = tr.fetcher.NextRowWithErrors(tr.ctx)
// There are four cases that can happen after NextRowWithErrors:
// 1) We encounter a ScrubError. We do not propagate the error up,
// but instead generate and emit a row for the final results.
// 2) No errors were found. We simply continue scanning the data
// and discard the row values, as they are not needed for any
// results.
// 3) A non-scrub error was encountered. This was not considered a
// physical data error, and so we propagate this to the user
// immediately.
// 4) There was no error or row data. This signals that there is
// no more data to scan.
//
// NB: Cases 3 and 4 are handled further below, in the standard
// table scanning code path.
if v, ok := err.(*scrub.Error); ok {
row, err = tr.generateScrubErrorRow(row, v)
} else if err == nil && row != nil {
continue
}
}
if row == nil || err != nil {
// This was the last-row or an error was encountered, annotate the
// metadata with misplanned ranges and trace data.
return nil, tr.producerMeta(scrub.UnwrapScrubError(err))
}

outRow, status, err := tr.out.ProcessRow(tr.ctx, row)
if outRow == nil && err == nil && status == NeedMoreRows {
continue
}
if err != nil {
return nil, tr.producerMeta(err)
}
return outRow, ProducerMetadata{}
}
}

func (tr *tableReader) ConsumerDone() {
if tr.consumerStatus != NeedMoreRows {
log.Fatalf(context.Background(), "tableReader already done or closed: %d", tr.consumerStatus)
}
tr.consumerStatus = DrainRequested
}

func (tr *tableReader) ConsumerClosed() {
if tr.consumerStatus == ConsumerClosed {
log.Fatalf(context.Background(), "tableReader already closed")
}
tr.consumerStatus = ConsumerClosed
// The consumer is done, Next() will not be called again.
tr.close()
}
Loading

0 comments on commit d4b6e3f

Please sign in to comment.