Skip to content

Commit

Permalink
WIP on always serializing and smart finalReferenceToBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzefovich committed Dec 1, 2022
1 parent 954ec2d commit 5a48615
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 46 deletions.
19 changes: 7 additions & 12 deletions pkg/sql/colfetcher/cfetcher_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,33 +45,28 @@ func init() {
storage.GetCFetcherWrapper = newCFetcherWrapper
}

func (c *cFetcherWrapper) NextBatch(
ctx context.Context, serialize bool,
) ([]byte, coldata.Batch, error) {
func (c *cFetcherWrapper) NextBatch(ctx context.Context) ([]byte, error) {
batch, err := c.fetcher.NextBatch(ctx)
if err != nil {
return nil, nil, err
return nil, err
}
if l := batch.Length(); c.removeLastRow && l > 0 {
batch.SetLength(l - 1)
}
if batch.Length() == 0 {
return nil, nil, nil
return nil, nil
}
c.sawBatch = true
if !serialize {
return nil, batch, nil
}
data, err := c.converter.BatchToArrow(batch)
if err != nil {
return nil, nil, err
return nil, err
}
c.buf.Reset()
_, _, err = c.serializer.Serialize(&c.buf, data, batch.Length())
if err != nil {
return nil, nil, err
return nil, err
}
return c.buf.Bytes(), nil, nil
return c.buf.Bytes(), nil
}

// ContinuesFirstRow returns true if the given key belongs to the same SQL row
Expand Down Expand Up @@ -168,7 +163,7 @@ func newCFetcherWrapper(
0, /* estimatedRowCount */
false, /* traceKV */
true, /* singleUse */
true, /* allocateFreshBatches */
false, /* allocateFreshBatches */
allowNullsInNonNullableOnLastRowInBatch,
}

Expand Down
1 change: 0 additions & 1 deletion pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ go_library(
"//pkg/util/buildutil",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/iterutil",
Expand Down
48 changes: 15 additions & 33 deletions pkg/storage/col_mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -46,12 +44,9 @@ type NextKVer interface {
// CFetcherWrapper is a wrapper around a colfetcher.cFetcher that returns a
// serialized set of bytes or a column-oriented batch.
type CFetcherWrapper interface {
// NextBatch gives back the next column-oriented batch.
//
// If serialize is true, the returned batch will be the byte slice,
// serialized in Arrow batch format. If serialize is false, the returned
// batch will be the coldata.Batch.
NextBatch(ctx context.Context, serialize bool) ([]byte, coldata.Batch, error)
// NextBatch gives back the next column-oriented batch, serialized in Arrow
// batch format.
NextBatch(ctx context.Context) ([]byte, error)

// Close release the resources held by this CFetcherWrapper. It *must* be
// called after use of the wrapper.
Expand Down Expand Up @@ -173,7 +168,7 @@ func (f *mvccScanFetchAdapter) NextKV(
}
// TODO: think through for how we need to handle the copying with multiple
// column families (here or in the cFetcher).
return true, lastKV, true, nil
return true, lastKV, f.scanner.wholeRows, nil
}

// MVCCScanToCols is like MVCCScan, but it returns KVData in a serialized
Expand Down Expand Up @@ -286,32 +281,19 @@ func mvccScanToCols(
//}
adapter.onNextKV = adapter.seek
adapter.results.wrapper = wrapper
if grpcutil.IsLocalRequestContext(ctx) {
for {
_, batch, err := wrapper.NextBatch(ctx, false /* serialize */)
if err != nil {
return res, err
}
if batch == nil {
break
}
res.ColBatches = append(res.ColBatches, batch)
for {
batch, err := wrapper.NextBatch(ctx)
if err != nil {
return res, err
}
} else {
for {
batch, _, err := wrapper.NextBatch(ctx, true /* serialize */)
if err != nil {
return res, err
}
if batch == nil {
break
}
// We need to make a copy since the wrapper reuses underlying bytes
// buffer.
b := make([]byte, len(batch))
copy(b, batch)
res.KVData = append(res.KVData, b)
if batch == nil {
break
}
// We need to make a copy since the wrapper reuses underlying bytes
// buffer.
b := make([]byte, len(batch))
copy(b, batch)
res.KVData = append(res.KVData, b)
}

res.ResumeSpan, res.ResumeReason, res.ResumeNextBytes, err = mvccScanner.afterScan()
Expand Down

0 comments on commit 5a48615

Please sign in to comment.