From d90a2ff06a2ead8e6a9db2b6d04d6f49649ca4dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Mon, 11 Dec 2017 16:07:47 +0100 Subject: [PATCH 1/2] distsql: start adding row batching Release note: None --- pkg/sql/distsqlrun/base.go | 101 +++++++++++++++++++++++++++++--- pkg/sql/distsqlrun/base_test.go | 2 + 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index c62c99d30787..a06c39a9b9b2 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -91,6 +91,12 @@ type RowReceiver interface { ProducerDone() } +type RowBatchReceiver interface { + RowReceiver + // TODO(asubiotto): Comment. Same semantics as RowReceiver.Push + PushBatch(batch RowBatch) ConsumerStatus +} + // CancellableRowReceiver is a special type of a RowReceiver that can be set to // canceled asynchronously (i.e. concurrently or after Push()es and ProducerDone()s). // Once canceled, subsequent Push()es return ConsumerClosed. Implemented by distSQLReceiver @@ -147,6 +153,12 @@ type RowSource interface { ConsumerClosed() } +type RowBatchSource interface { + RowSource + // TODO(asubiotto): Comment. Should be same semantics as Next() + NextBatch() (RowBatch, ProducerMetadata) +} + // DrainAndForwardMetadata calls src.ConsumerDone() (thus asking src for // draining metadata) and then forwards all the metadata to dst. // @@ -278,6 +290,8 @@ type RowChannelMsg struct { // Only one of these fields will be set. Row sqlbase.EncDatumRow Meta ProducerMetadata + // TODO(asubiotto): Eventually Batch will replace Row. + Batch RowBatch } // ProducerMetadata represents a metadata record flowing through a DistSQL flow. @@ -292,6 +306,24 @@ type ProducerMetadata struct { TraceData []tracing.RecordedSpan } +// RowBatchSize is the maximum size of a RowBatch. This number is based on the +// batch size of the kv fetcher. +const RowBatchSize = 10000 + +// TODO(asubiotto): Would this make more sense in pkg/sql/sqlbase? +// TODO(asubiotto): Inefficient representation for now. Might be better to +// be a [][]byte field or a []byte with row offsets stored alongside and a map +// of index to any special (non-value) encodings. +type RowBatch []sqlbase.EncDatumRow + +// TODO(asubiotto): For processors that don't work on batches of rows and will +// be calling Next(), we will want to "unbatch" the rows in the RowSource. +// CAREFUL: We might be breaking ordering guarantees. + +// TODO(asubiotto): Write a RowBatchIterator. It kind of bugs me how similar +// this is to the memRowContainer. It's probably too heavyweight for this. Might +// want to investigate this. + // Empty returns true if none of the fields in metadata are populated. func (meta ProducerMetadata) Empty() bool { return meta.Ranges == nil && meta.Err == nil && meta.TraceData == nil @@ -311,10 +343,17 @@ type RowChannel struct { // consumerStatus is an atomic that signals whether the RowChannel is only // accepting draining metadata or is no longer accepting any rows via Push. consumerStatus ConsumerStatus + + // pendingBatch is a RowBatch that has been read from the channel but not + // yet read through Next() or NextBatch(). It is only used when a consumer + // calls Next() instead of NextBatch() when the RowChannel's producer is + // pushing RowBatches and the consumer needs to be fed one row at a time. + // TODO(asubiotto): Remove once rows are no longer sent one by one. + pendingBatch RowBatch } -var _ RowReceiver = &RowChannel{} -var _ RowSource = &RowChannel{} +var _ RowBatchReceiver = &RowChannel{} +var _ RowBatchSource = &RowChannel{} // InitWithBufSize initializes the RowChannel with a given buffer size. func (rc *RowChannel) InitWithBufSize(types []sqlbase.ColumnType, chanBufSize int) { @@ -330,15 +369,29 @@ func (rc *RowChannel) Init(types []sqlbase.ColumnType) { // Push is part of the RowReceiver interface. func (rc *RowChannel) Push(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus { + return rc.pushMsg(RowChannelMsg{Row: row, Meta: meta}) +} + +// Push is part of the RowBatchReceiver interface. +func (rc *RowChannel) PushBatch(batch RowBatch) ConsumerStatus { + return rc.pushMsg(RowChannelMsg{Batch: batch}) +} + +func (rc *RowChannel) pushMsg(msg RowChannelMsg) ConsumerStatus { consumerStatus := ConsumerStatus( atomic.LoadUint32((*uint32)(&rc.consumerStatus))) switch consumerStatus { case NeedMoreRows: - rc.dataChan <- RowChannelMsg{Row: row, Meta: meta} + rc.dataChan <- msg case DrainRequested: // If we're draining, only forward metadata. - if !meta.Empty() { - rc.dataChan <- RowChannelMsg{Meta: meta} + if msg.Row != nil { + msg.Row = nil + } else if msg.Batch != nil { + msg.Batch = nil + } + if !msg.Meta.Empty() { + rc.dataChan <- msg } case ConsumerClosed: // If the consumer is gone, swallow all the rows. @@ -356,14 +409,46 @@ func (rc *RowChannel) Types() []sqlbase.ColumnType { return rc.types } -// Next is part of the RowSource interface. +// Next is part of the RowSource interface. This implementation of Next() is not +// threadsafe. func (rc *RowChannel) Next() (sqlbase.EncDatumRow, ProducerMetadata) { + if len(rc.pendingBatch) != 0 { + result := rc.pendingBatch[0] + rc.pendingBatch = rc.pendingBatch[1:] + return result, ProducerMetadata{} + } + d := rc.nextMsg() + if d.Batch != nil { + rc.pendingBatch = d.Batch + return rc.Next() + } + return d.Row, d.Meta +} + +// NextBatch is part of the RowBatchSource interface. +func (rc *RowChannel) NextBatch() (RowBatch, ProducerMetadata) { + if len(rc.pendingBatch) != 0 { + result := rc.pendingBatch + rc.pendingBatch = nil + return result, ProducerMetadata{} + } + d := rc.nextMsg() + if d.Batch != nil { + return d.Batch, d.Meta + } + if d.Row != nil { + return RowBatch{d.Row}, d.Meta + } + return nil, d.Meta +} + +func (rc *RowChannel) nextMsg() RowChannelMsg { d, ok := <-rc.C if !ok { // No more rows. - return nil, ProducerMetadata{} + return RowChannelMsg{Meta: ProducerMetadata{}} } - return d.Row, d.Meta + return d } // ConsumerDone is part of the RowSource interface. diff --git a/pkg/sql/distsqlrun/base_test.go b/pkg/sql/distsqlrun/base_test.go index 5e0d27b49e78..4cccf9d705d6 100644 --- a/pkg/sql/distsqlrun/base_test.go +++ b/pkg/sql/distsqlrun/base_test.go @@ -24,6 +24,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" ) +// TODO(asubiotto): Add test and benchmark for RowChannel juggling batches. + // Benchmark a pipeline of RowChannels. func BenchmarkRowChannelPipeline(b *testing.B) { columnTypeInt := sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT} From bf13e36b87dbff6edee54c473326785ceb84ad5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Mon, 11 Dec 2017 18:08:38 +0100 Subject: [PATCH 2/2] benchmark to help decide where to focus performance efforts Release note: None --- pkg/sql/distsqlrun/aggregator.go | 179 ++++++++++++++----------- pkg/sql/distsqlrun/base_test.go | 176 ++++++++++++++++++++++++ pkg/sql/distsqlrun/tablereader.go | 47 ++++++- pkg/sql/distsqlrun/tablereader_test.go | 107 +++++++++++++++ 4 files changed, 427 insertions(+), 82 deletions(-) diff --git a/pkg/sql/distsqlrun/aggregator.go b/pkg/sql/distsqlrun/aggregator.go index 96dfbb9ea936..7c92d2392e98 100644 --- a/pkg/sql/distsqlrun/aggregator.go +++ b/pkg/sql/distsqlrun/aggregator.go @@ -90,6 +90,7 @@ func GetAggregateInfo( // aggregator's output schema is comprised of what is specified by the // accompanying SELECT expressions. type aggregator struct { + Batching bool processorBase flowCtx *FlowCtx @@ -172,19 +173,21 @@ func newAggregator( return ag, nil } +func (ag *aggregator) Close(ctx context.Context) { + for _, f := range ag.funcs { + for _, aggFunc := range f.buckets { + aggFunc.Close(ctx) + } + } + ag.bucketsAcc.Close(ctx) +} + // Run is part of the processor interface. func (ag *aggregator) Run(ctx context.Context, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } - defer ag.bucketsAcc.Close(ctx) - defer func() { - for _, f := range ag.funcs { - for _, aggFunc := range f.buckets { - aggFunc.Close(ctx) - } - } - }() + defer ag.Close(ctx) ctx = log.WithLogTag(ctx, "Agg", nil) ctx, span := processorSpan(ctx, "aggregator") @@ -201,7 +204,11 @@ func (ag *aggregator) Run(ctx context.Context, wg *sync.WaitGroup) { } log.VEvent(ctx, 1, "accumulation complete") + ag.RenderResults(ctx) + +} +func (ag *aggregator) RenderResults(ctx context.Context) { // Queries like `SELECT MAX(n) FROM t` expect a row of NULLs if nothing was // aggregated. if len(ag.buckets) < 1 && len(ag.groupCols) == 0 { @@ -245,6 +252,31 @@ func (ag *aggregator) Run(ctx context.Context, wg *sync.WaitGroup) { // If an error is returned, both the input and the output have been properly // closed, and the error has also been forwarded to the output. func (ag *aggregator) accumulateRows(ctx context.Context) (err error) { + var scratch []byte + if ag.Batching { + rowChan := ag.input.(*RowChannel) + for { + batch, meta := rowChan.NextBatch() + for _, row := range batch { + if cont, err := ag.NextRow(ctx, scratch, row, meta); err != nil || !cont { + return err + } + } + if batch == nil { + return nil + } + } + return nil + } + for { + row, meta := ag.input.Next() + if cont, err := ag.NextRow(ctx, scratch, row, meta); err != nil || !cont { + return err + } + } +} + +func (ag *aggregator) NextRow(ctx context.Context, scratch []byte, row sqlbase.EncDatumRow, meta ProducerMetadata) (cont bool, err error) { cleanupRequired := true defer func() { if err != nil { @@ -254,85 +286,82 @@ func (ag *aggregator) accumulateRows(ctx context.Context) (err error) { } } }() - - var scratch []byte - for { - row, meta := ag.input.Next() - if !meta.Empty() { - if meta.Err != nil { - return meta.Err - } - if !emitHelper(ctx, &ag.out, nil /* row */, meta, ag.input) { - // TODO(andrei): here, because we're passing metadata through, we have - // an opportunity to find out that the consumer doesn't need the data - // any more. If the producer doesn't push any metadata, then there's no - // opportunity to find this out until the accumulation phase is done. We - // should have a way to periodically peek at the state of the - // RowReceiver that's hiding behind the ProcOutputHelper. - cleanupRequired = false - return errors.Errorf("consumer stopped before it received rows") - } - continue + if !meta.Empty() { + if meta.Err != nil { + return false, meta.Err } - if row == nil { - return nil + if !emitHelper(ctx, &ag.out, nil /* row */, meta, ag.input) { + // TODO(andrei): here, because we're passing metadata through, we have + // an opportunity to find out that the consumer doesn't need the data + // any more. If the producer doesn't push any metadata, then there's no + // opportunity to find this out until the accumulation phase is done. We + // should have a way to periodically peek at the state of the + // RowReceiver that's hiding behind the ProcOutputHelper. + cleanupRequired = false + return false, errors.Errorf("consumer stopped before it received rows") } + return true, nil + } + if row == nil { + return false, nil + } - // The encoding computed here determines which bucket the non-grouping - // datums are accumulated to. - encoded, err := ag.encode(scratch, row) - if err != nil { - return err - } + // The encoding computed here determines which bucket the non-grouping + // datums are accumulated to. + encoded, err := ag.encode(scratch, row) + if err != nil { + return false, err + } - if _, ok := ag.buckets[string(encoded)]; !ok { - if err := ag.bucketsAcc.Grow(ctx, int64(len(encoded))); err != nil { - return err - } - ag.buckets[string(encoded)] = struct{}{} + if _, ok := ag.buckets[string(encoded)]; !ok { + if err := ag.bucketsAcc.Grow(ctx, int64(len(encoded))); err != nil { + return false, err } + ag.buckets[string(encoded)] = struct{}{} + } - // Feed the func holders for this bucket the non-grouping datums. - for i, a := range ag.aggregations { - if a.FilterColIdx != nil { - col := *a.FilterColIdx - if err := row[col].EnsureDecoded(&ag.inputTypes[col], &ag.datumAlloc); err != nil { - return err - } - if row[*a.FilterColIdx].Datum != tree.DBoolTrue { - // This row doesn't contribute to this aggregation. - continue - } + // Feed the func holders for this bucket the non-grouping datums. + for i, a := range ag.aggregations { + if a.FilterColIdx != nil { + col := *a.FilterColIdx + if err := row[col].EnsureDecoded(&ag.inputTypes[col], &ag.datumAlloc); err != nil { + return false, err } - // Extract the corresponding arguments from the row to feed into the - // aggregate function. - // Most functions require at most one argument thus we separate - // the first argument and allocation of (if applicable) a variadic - // collection of arguments thereafter. - var firstArg tree.Datum - var otherArgs tree.Datums - if len(a.ColIdx) > 1 { - otherArgs = make(tree.Datums, len(a.ColIdx)-1) + if row[*a.FilterColIdx].Datum != tree.DBoolTrue { + // This row doesn't contribute to this aggregation. + return true, nil } - isFirstArg := true - for j, c := range a.ColIdx { - if err := row[c].EnsureDecoded(&ag.inputTypes[c], &ag.datumAlloc); err != nil { - return err - } - if isFirstArg { - firstArg = row[c].Datum - isFirstArg = false - continue - } - otherArgs[j-1] = row[c].Datum + } + // Extract the corresponding arguments from the row to feed into the + // aggregate function. + // Most functions require at most one argument thus we separate + // the first argument and allocation of (if applicable) a variadic + // collection of arguments thereafter. + var firstArg tree.Datum + var otherArgs tree.Datums + if len(a.ColIdx) > 1 { + otherArgs = make(tree.Datums, len(a.ColIdx)-1) + } + isFirstArg := true + for j, c := range a.ColIdx { + if err := row[c].EnsureDecoded(&ag.inputTypes[c], &ag.datumAlloc); err != nil { + return false, err } - - if err := ag.funcs[i].add(ctx, encoded, firstArg, otherArgs); err != nil { - return err + if isFirstArg { + firstArg = row[c].Datum + isFirstArg = false + return true, nil } + otherArgs[j-1] = row[c].Datum + } + + if err := ag.funcs[i].add(ctx, encoded, firstArg, otherArgs); err != nil { + return false, err } - scratch = encoded[:0] } + + scratch = encoded[:0] + return true, nil } type aggregateFuncHolder struct { diff --git a/pkg/sql/distsqlrun/base_test.go b/pkg/sql/distsqlrun/base_test.go index 4cccf9d705d6..73c13fa9fecd 100644 --- a/pkg/sql/distsqlrun/base_test.go +++ b/pkg/sql/distsqlrun/base_test.go @@ -20,8 +20,10 @@ import ( "testing" "unsafe" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "golang.org/x/net/context" ) // TODO(asubiotto): Add test and benchmark for RowChannel juggling batches. @@ -71,5 +73,179 @@ func BenchmarkRowChannelPipeline(b *testing.B) { rc[0].ProducerDone() wg.Wait() }) + + } +} + +type callbackFn func(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus + +type CallbackReceiver struct { + fn callbackFn +} + +func NewCallbackReceiver(fn callbackFn) *CallbackReceiver { + return &CallbackReceiver{fn: fn} +} + +func (c *CallbackReceiver) Push(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus { + return c.fn(row, meta) +} + +func (c *CallbackReceiver) ProducerDone() {} + +var _ RowReceiver = &CallbackReceiver{} + +func BenchmarkJoinAndCount(b *testing.B) { + ctx := context.Background() + evalCtx := tree.MakeTestingEvalContext() + defer evalCtx.Stop(ctx) + flowCtx := FlowCtx{ + Settings: cluster.MakeTestingClusterSettings(), + EvalCtx: evalCtx, + } + + spec := HashJoinerSpec{ + LeftEqColumns: []uint32{0}, + RightEqColumns: []uint32{0}, + Type: JoinType_INNER, + } + //post := PostProcessSpec{Projection: true, OutputColumns: []uint32{0}} + // TODO(asubiotto): I think this should be right. + post := PostProcessSpec{} + + columnTypeInt := sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT} + + // Input will serve as both left and right inputs. Every row will have the + // same column value, thus producing inputSize^2 rows. + const inputSize = 1000 + input := make(sqlbase.EncDatumRows, inputSize) + for i := range input { + input[i] = sqlbase.EncDatumRow{sqlbase.DatumToEncDatum(columnTypeInt, tree.NewDInt(tree.DInt(1)))} } + + types := []sqlbase.ColumnType{columnTypeInt} + + leftInput := NewRepeatableRowSource(types, input) + rightInput := NewRepeatableRowSource(types, input) + + b.Run("Normal", func(b *testing.B) { + for i := 0; i < b.N; i++ { + leftInput.Reset() + rightInput.Reset() + conn := &RowChannel{} + conn.Init(types) + ag, err := newAggregator( + &flowCtx, + &AggregatorSpec{ + Aggregations: []AggregatorSpec_Aggregation{{Func: AggregatorSpec_COUNT_ROWS}}, + }, + conn, + &post, + &RowDisposer{}, + ) + if err != nil { + b.Fatal(err) + } + var wg sync.WaitGroup + go func() { + wg.Add(1) + ag.Run(ctx, &wg) + }() + h, err := newHashJoiner(&flowCtx, &spec, leftInput, rightInput, &post, conn) + if err != nil { + b.Fatal(err) + } + h.Run(ctx, nil) + + wg.Wait() + } + }) + + b.Run("ElideRowChan", func(b *testing.B) { + for i := 0; i < b.N; i++ { + leftInput.Reset() + rightInput.Reset() + ag, err := newAggregator( + &flowCtx, + &AggregatorSpec{ + Aggregations: []AggregatorSpec_Aggregation{{Func: AggregatorSpec_COUNT_ROWS}}, + }, + // This doesn't matter as we're not reading from input. + &RepeatableRowSource{}, + &post, + &RowDisposer{}, + ) + if err != nil { + b.Fatal(err) + } + defer ag.Close(ctx) + var scratch []byte + out := NewCallbackReceiver(func(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus { + if _, err := ag.NextRow(ctx, scratch, row, meta); err != nil { + b.Fatal(err) + } + return NeedMoreRows + }) + h, err := newHashJoiner(&flowCtx, &spec, leftInput, rightInput, &post, out) + if err != nil { + b.Fatal(err) + } + h.Run(ctx, nil) + ag.RenderResults(ctx) + } + }) + + b.Run("RowBatching", func(b *testing.B) { + for i := 0; i < b.N; i++ { + leftInput.Reset() + rightInput.Reset() + conn := &RowChannel{} + conn.Init(types) + ag, err := newAggregator( + &flowCtx, + &AggregatorSpec{ + Aggregations: []AggregatorSpec_Aggregation{{Func: AggregatorSpec_COUNT_ROWS}}, + }, + conn, + &post, + &RowDisposer{}, + ) + if err != nil { + b.Fatal(err) + } + ag.Batching = true + var wg sync.WaitGroup + go func() { + wg.Add(1) + ag.Run(ctx, &wg) + }() + batch := make(RowBatch, 0, RowBatchSize) + h, err := newHashJoiner( + &flowCtx, + &spec, + leftInput, + rightInput, + &post, + NewCallbackReceiver(func(row sqlbase.EncDatumRow, meta ProducerMetadata) ConsumerStatus { + if row != nil { + batch = append(batch, row) + if len(batch) == RowBatchSize { + status := conn.PushBatch(batch) + batch = make(RowBatch, 0, RowBatchSize) + return status + } + return NeedMoreRows + } + return conn.Push(row, meta) + }), + ) + if err != nil { + b.Fatal(err) + } + h.Run(ctx, nil) + conn.ProducerDone() + + wg.Wait() + } + }) } diff --git a/pkg/sql/distsqlrun/tablereader.go b/pkg/sql/distsqlrun/tablereader.go index 7e6bfc59627d..d80b8ecdf797 100644 --- a/pkg/sql/distsqlrun/tablereader.go +++ b/pkg/sql/distsqlrun/tablereader.go @@ -51,6 +51,7 @@ var ScrubTypes = []sqlbase.ColumnType{ // desired column values to an output RowReceiver. // See docs/RFCS/distributed_sql.md type tableReader struct { + Batching bool processorBase flowCtx *FlowCtx @@ -197,6 +198,18 @@ func (tr *tableReader) sendMisplannedRangesMetadata(ctx context.Context) { } } +func (tr *tableReader) ReadyForFetch() { + ctx := context.TODO() + if err := tr.fetcher.StartScan( + ctx, tr.flowCtx.txn, tr.spans, true /* limit batches */, tr.limitHint, false, /* traceKV */ + ); err != nil { + log.Errorf(ctx, "scan error: %s", err) + tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err}) + tr.out.Close() + return + } +} + // Run is part of the processor interface. func (tr *tableReader) Run(ctx context.Context, wg *sync.WaitGroup) { if wg != nil { @@ -218,15 +231,30 @@ func (tr *tableReader) Run(ctx context.Context, wg *sync.WaitGroup) { } // TODO(radu,andrei,knz): set the traceKV flag when requested by the session. - if err := tr.fetcher.StartScan( - ctx, txn, tr.spans, true /* limit batches */, tr.limitHint, false, /* traceKV */ - ); err != nil { - log.Errorf(ctx, "scan error: %s", err) - tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err}) - tr.out.Close() + tr.ReadyForFetch() + if tr.Batching { + rowChan := tr.out.output.(*RowChannel) + for { + batch := make(RowBatch, 0, RowBatchSize) + for i := 0; i < RowBatchSize; i++ { + row, _, _, err := tr.fetcher.NextRow(ctx) + if err != nil { + tr.out.output.Push(nil /* row */, ProducerMetadata{Err: err}) + return + } else if row == nil { + break + } + + rowCopy := append(sqlbase.EncDatumRow(nil), row...) + batch = append(batch, rowCopy) + } + if len(batch) == 0 { + return + } + rowChan.PushBatch(batch) + } return } - for { var row sqlbase.EncDatumRow var err error @@ -278,6 +306,11 @@ func (tr *tableReader) Run(ctx context.Context, wg *sync.WaitGroup) { tr.out.Close() } +func (tr *tableReader) NextRow() (sqlbase.EncDatumRow, ProducerMetadata) { + row, _, _, err := tr.fetcher.NextRow(context.TODO()) + return row, ProducerMetadata{Err: err} +} + // generateScrubErrorRow will create an EncDatumRow describing a // physical check error encountered when scanning table data. The schema // of the EncDatumRow is the ScrubTypes constant. diff --git a/pkg/sql/distsqlrun/tablereader_test.go b/pkg/sql/distsqlrun/tablereader_test.go index 08fb0d1363af..02fc16863946 100644 --- a/pkg/sql/distsqlrun/tablereader_test.go +++ b/pkg/sql/distsqlrun/tablereader_test.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/pkg/errors" ) func TestTableReader(t *testing.T) { @@ -295,3 +296,109 @@ func BenchmarkTableReader(b *testing.B) { } } } + +func BenchmarkNextSteps(b *testing.B) { + s, sqlDB, kvDB := serverutils.StartServer(b, base.TestServerArgs{}) + defer s.Stopper().Stop(context.Background()) + + sqlutils.CreateTable( + b, sqlDB, "t", + "k INT PRIMARY KEY, v INT", + 10000, + sqlutils.ToRowFn(sqlutils.RowIdxFn, sqlutils.RowModuloFn(42)), + ) + + tableDesc := sqlbase.GetTableDescriptor(kvDB, "test", "t") + + evalCtx := tree.MakeTestingEvalContext() + defer evalCtx.Stop(context.Background()) + flowCtx := FlowCtx{ + EvalCtx: evalCtx, + Settings: s.ClusterSettings(), + // Pass a DB without a TxnCoordSender. + txn: client.NewTxn(client.NewDB(s.DistSender(), s.Clock()), s.NodeID()), + nodeID: s.NodeID(), + } + spec := TableReaderSpec{ + Table: *tableDesc, + Spans: []TableReaderSpan{{Span: tableDesc.PrimaryIndexSpan()}}, + } + post := PostProcessSpec{} + types := make([]sqlbase.ColumnType, len(tableDesc.Columns)) + for i := range types { + types[i] = tableDesc.Columns[i].Type + } + b.Run("Normal", func(b *testing.B) { + for i := 0; i < b.N; i++ { + out := &RowChannel{} + out.Init(types) + errChan := make(chan error) + go func() { + for { + row, meta := out.Next() + if !meta.Empty() { + errChan <- errors.Errorf("unexpected metadata: %+v", meta) + } + if row == nil { + break + } + } + errChan <- nil + }() + tr, err := newTableReader(&flowCtx, &spec, &post, out) + if err != nil { + b.Fatal(err) + } + tr.Run(context.Background(), nil) + + if err := <-errChan; err != nil { + b.Fatal(err) + } + } + }) + b.Run("ElideRowChan", func(b *testing.B) { + for i := 0; i < b.N; i++ { + // Out doesn't matter + out := &RowBuffer{} + tr, err := newTableReader(&flowCtx, &spec, &post, out) + if err != nil { + b.Fatal(err) + } + tr.ReadyForFetch() + for { + row, meta := tr.NextRow() + if !meta.Empty() { + b.Fatalf("unexpected metadata: %+v", meta) + } + if row == nil { + break + } + } + } + }) + b.Run("RowBatch", func(b *testing.B) { + for i := 0; i < b.N; i++ { + out := &RowChannel{} + out.Init(types) + errChan := make(chan error) + go func() { + for { + batch, meta := out.NextBatch() + if !meta.Empty() { + errChan <- errors.Errorf("unexpected metadata: %+v", meta) + } + if batch == nil { + break + } + } + errChan <- nil + }() + tr, err := newTableReader(&flowCtx, &spec, &post, out) + if err != nil { + b.Fatal(err) + } + tr.Batching = true + tr.Run(context.Background(), nil) + } + }) +}