Skip to content

Commit

Permalink
Merge #94348
Browse files Browse the repository at this point in the history
94348: sql,storage: some preliminary changes for KV projection pushdown r=yuzefovich a=yuzefovich

This PR contains a couple of commits that are mostly mechanical changes in preparation of the KV pushdown work. Some microbenchmarks of this PR are [here](https://gist.github.com/yuzefovich/24d3238bc638cc1121fd345c68ca3d0b), and they show effectively no change in the scan speed.

Epic: CRDB-14837

Informs: #82323
Informs: #87610

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 4, 2023
2 parents a4c71fd + a74ca72 commit 2c3d75f
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 191 deletions.
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
37 changes: 23 additions & 14 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
Expand Down Expand Up @@ -213,9 +214,11 @@ type cFetcher struct {
// mvccDecodeStrategy controls whether or not MVCC timestamps should
// be decoded from KV's fetched. It is set if any of the requested tables
// are required to produce an MVCC timestamp system column.
mvccDecodeStrategy row.MVCCDecodingStrategy
mvccDecodeStrategy storage.MVCCDecodingStrategy

// fetcher is the underlying fetcher that provides KVs.
// nextKVer provides KVs.
nextKVer storage.NextKVer
// fetcher, if set, is the same object as nextKVer.
fetcher *row.KVFetcher
// bytesRead and batchRequestsIssued store the total number of bytes read
// and of BatchRequests issued, respectively, by this cFetcher throughout
Expand Down Expand Up @@ -323,7 +326,7 @@ func (cf *cFetcher) resetBatch() {
// Init sets up the cFetcher based on the table args. Only columns present in
// tableArgs.cols will be fetched.
func (cf *cFetcher) Init(
allocator *colmem.Allocator, kvFetcher *row.KVFetcher, tableArgs *cFetcherTableArgs,
allocator *colmem.Allocator, nextKVer storage.NextKVer, tableArgs *cFetcherTableArgs,
) error {
if tableArgs.spec.Version != fetchpb.IndexFetchSpecVersionInitial {
return errors.Newf("unsupported IndexFetchSpec version %d", tableArgs.spec.Version)
Expand Down Expand Up @@ -366,7 +369,7 @@ func (cf *cFetcher) Init(
switch colinfo.GetSystemColumnKindFromColumnID(colID) {
case catpb.SystemColumnKind_MVCCTIMESTAMP:
table.timestampOutputIdx = idx
cf.mvccDecodeStrategy = row.MVCCDecodingRequired
cf.mvccDecodeStrategy = storage.MVCCDecodingRequired
table.neededValueColsByIdx.Remove(idx)
case catpb.SystemColumnKind_TABLEOID:
table.oidOutputIdx = idx
Expand Down Expand Up @@ -457,7 +460,10 @@ func (cf *cFetcher) Init(
}

cf.table = table
cf.fetcher = kvFetcher
cf.nextKVer = nextKVer
if kvFetcher, ok := nextKVer.(*row.KVFetcher); ok {
cf.fetcher = kvFetcher
}
cf.accountingHelper.Init(allocator, cf.memoryLimit, cf.table.typs)

return nil
Expand Down Expand Up @@ -628,7 +634,7 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
case stateInvalid:
return nil, errors.New("invalid fetcher state")
case stateInitFetch:
moreKVs, kv, _, finalReferenceToBatch, err := cf.fetcher.NextKV(ctx, cf.mvccDecodeStrategy)
moreKVs, kv, needsCopy, err := cf.nextKVer.NextKV(ctx, cf.mvccDecodeStrategy)
if err != nil {
return nil, cf.convertFetchError(ctx, err)
}
Expand Down Expand Up @@ -658,7 +664,7 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
}
*/

cf.setNextKV(kv, finalReferenceToBatch)
cf.setNextKV(kv, needsCopy)
cf.machine.state[0] = stateDecodeFirstKVOfRow

case stateResetBatch:
Expand Down Expand Up @@ -778,7 +784,7 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
cf.machine.state[0] = stateFetchNextKVWithUnfinishedRow

case stateFetchNextKVWithUnfinishedRow:
moreKVs, kv, _, finalReferenceToBatch, err := cf.fetcher.NextKV(ctx, cf.mvccDecodeStrategy)
moreKVs, kv, needsCopy, err := cf.nextKVer.NextKV(ctx, cf.mvccDecodeStrategy)
if err != nil {
return nil, cf.convertFetchError(ctx, err)
}
Expand All @@ -790,7 +796,7 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
}
// TODO(jordan): if nextKV returns newSpan = true, set the new span
// prefix and indicate that it needs decoding.
cf.setNextKV(kv, finalReferenceToBatch)
cf.setNextKV(kv, needsCopy)
if debugState {
log.Infof(ctx, "decoding next key %s", cf.machine.nextKV.Key)
}
Expand Down Expand Up @@ -1314,10 +1320,13 @@ func (cf *cFetcher) Release() {
}

func (cf *cFetcher) Close(ctx context.Context) {
if cf != nil && cf.fetcher != nil {
cf.bytesRead = cf.fetcher.GetBytesRead()
cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued()
cf.fetcher.Close(ctx)
cf.fetcher = nil
if cf != nil {
cf.nextKVer = nil
if cf.fetcher != nil {
cf.bytesRead = cf.fetcher.GetBytesRead()
cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued()
cf.fetcher.Close(ctx)
cf.fetcher = nil
}
}
}
1 change: 1 addition & 0 deletions pkg/sql/row/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"//pkg/sql/span",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission",
Expand Down
81 changes: 52 additions & 29 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
Expand All @@ -52,27 +53,33 @@ const DebugRowFetch = false
// part of the output.
const noOutputColumn = -1

// kvBatchFetcherResponse contains a response from the KVBatchFetcher.
type kvBatchFetcherResponse struct {
// moreKVs indicates whether this response contains some keys from the
// KVBatchFetcherResponse contains a response from the KVBatchFetcher.
type KVBatchFetcherResponse struct {
// MoreKVs indicates whether this response contains some keys from the
// fetch.
//
// If true, then the fetch might (or might not) be complete at this point -
// another call to nextBatch() is needed to find out. If false, then the
// another call to NextBatch() is needed to find out. If false, then the
// fetch has already been completed and this response doesn't have any
// fetched data.
moreKVs bool
// Only one of kvs and batchResponse will be set. Which one is set depends
// on the server version. Both must be handled by calling code.
//
// kvs, if set, is a slice of roachpb.KeyValue, the deserialized kv pairs
// Note that it is possible that MoreKVs is true when neither KVs nor
// BatchResponse is set. This can occur when there was nothing to fetch for
// a Scan or a ReverseScan request, so the caller should just skip over such
// a response.
MoreKVs bool
// Only one of KVs and BatchResponse will be set. Which one is set depends
// on the request type (KVs is used for Gets and BatchResponse for Scans and
// ReverseScans). Both must be handled by calling code.
//
// KVs, if set, is a slice of roachpb.KeyValue, the deserialized kv pairs
// that were fetched.
kvs []roachpb.KeyValue
// batchResponse, if set, is a packed byte slice containing the keys and
// values. An empty batchResponse indicates that nothing was fetched for the
KVs []roachpb.KeyValue
// BatchResponse, if set, is a packed byte slice containing the keys and
// values. An empty BatchResponse indicates that nothing was fetched for the
// corresponding ScanRequest, and the caller is expected to skip over the
// response.
batchResponse []byte
BatchResponse []byte
// spanID is the ID associated with the span that generated this response.
spanID int
}
Expand All @@ -88,11 +95,22 @@ type KVBatchFetcher interface {
firstBatchKeyLimit rowinfra.KeyLimit,
) error

// nextBatch returns the next batch of rows. See kvBatchFetcherResponse for
// NextBatch returns the next batch of rows. See KVBatchFetcherResponse for
// details on what is returned.
nextBatch(ctx context.Context) (kvBatchFetcherResponse, error)
NextBatch(ctx context.Context) (KVBatchFetcherResponse, error)

// GetBytesRead returns the number of bytes read by this fetcher. It is safe
// for concurrent use and is able to handle a case of uninitialized fetcher.
GetBytesRead() int64

close(ctx context.Context)
// GetBatchRequestsIssued returns the number of BatchRequests issued by this
// fetcher throughout its lifetime. It is safe for concurrent use and is
// able to handle a case of uninitialized fetcher.
GetBatchRequestsIssued() int64

// Close releases the resources of this KVBatchFetcher. Must be called once
// the fetcher is no longer in use.
Close(ctx context.Context)
}

type tableInfo struct {
Expand Down Expand Up @@ -170,7 +188,7 @@ type Fetcher struct {

// mvccDecodeStrategy controls whether or not MVCC timestamps should
// be decoded from KV's fetched.
mvccDecodeStrategy MVCCDecodingStrategy
mvccDecodeStrategy storage.MVCCDecodingStrategy

// -- Fields updated during a scan --

Expand Down Expand Up @@ -299,7 +317,7 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
switch colinfo.GetSystemColumnKindFromColumnID(colID) {
case catpb.SystemColumnKind_MVCCTIMESTAMP:
table.timestampOutputIdx = idx
rf.mvccDecodeStrategy = MVCCDecodingRequired
rf.mvccDecodeStrategy = storage.MVCCDecodingRequired

case catpb.SystemColumnKind_TABLEOID:
table.oidOutputIdx = idx
Expand Down Expand Up @@ -378,21 +396,22 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
}
rf.kvFetcher = args.StreamingKVFetcher
} else if !args.WillUseKVProvider {
fetcherArgs := kvBatchFetcherArgs{
var batchRequestsIssued int64
fetcherArgs := newTxnKVFetcherArgs{
reverse: args.Reverse,
lockStrength: args.LockStrength,
lockWaitPolicy: args.LockWaitPolicy,
lockTimeout: args.LockTimeout,
acc: rf.kvFetcherMemAcc,
forceProductionKVBatchSize: args.ForceProductionKVBatchSize,
batchRequestsIssued: &batchRequestsIssued,
}
var batchRequestsIssued int64
if args.Txn != nil {
fetcherArgs.sendFn = makeKVBatchFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued)
fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued)
fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ
}
rf.kvFetcher = newKVFetcher(newKVBatchFetcher(fetcherArgs), &batchRequestsIssued)
rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs))
}

return nil
Expand All @@ -407,8 +426,12 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
// - allowing the caller to update the Fetcher to use the new txn. In this case,
// the caller should be careful since reads performed under different txns
// do not provide consistent view of the data.
//
// Note that this resets the number of batch requests issued by the Fetcher.
// Consider using GetBatchRequestsIssued if that information is needed.
func (rf *Fetcher) SetTxn(txn *kv.Txn) error {
sendFn := makeKVBatchFetcherDefaultSendFunc(txn, rf.kvFetcher.atomics.batchRequestsIssued)
var batchRequestsIssued int64
sendFn := makeTxnKVFetcherDefaultSendFunc(txn, &batchRequestsIssued)
return rf.setTxnAndSendFn(txn, sendFn)
}

Expand Down Expand Up @@ -610,10 +633,7 @@ func (rf *Fetcher) ConsumeKVProvider(ctx context.Context, f *KVProvider) error {
if rf.kvFetcher != nil {
rf.kvFetcher.Close(ctx)
}
// We won't actually perform any KV reads, so we don't need to track the
// number of batch requests issued - the case of the KVProvider is handled
// separately in GetBatchRequestsIssued().
rf.kvFetcher = newKVFetcher(f, nil /* batchRequestsIssued */)
rf.kvFetcher = newKVFetcher(f)
return rf.startScan(ctx)
}

Expand Down Expand Up @@ -652,11 +672,11 @@ func (rf *Fetcher) setNextKV(kv roachpb.KeyValue, needsCopy bool) {
// nextKey retrieves the next key/value and sets kv/kvEnd. Returns whether the
// key indicates a new row (as opposed to another family for the current row).
func (rf *Fetcher) nextKey(ctx context.Context) (newRow bool, spanID int, _ error) {
ok, kv, spanID, finalReferenceToBatch, err := rf.kvFetcher.NextKV(ctx, rf.mvccDecodeStrategy)
ok, kv, spanID, needsCopy, err := rf.kvFetcher.nextKV(ctx, rf.mvccDecodeStrategy)
if err != nil {
return false, 0, ConvertFetchError(&rf.table.spec, err)
}
rf.setNextKV(kv, finalReferenceToBatch)
rf.setNextKV(kv, needsCopy)

if !ok {
// No more keys in the scan.
Expand Down Expand Up @@ -1260,13 +1280,16 @@ func (rf *Fetcher) Key() roachpb.Key {

// GetBytesRead returns total number of bytes read by the underlying KVFetcher.
func (rf *Fetcher) GetBytesRead() int64 {
if rf == nil || rf.kvFetcher == nil {
return 0
}
return rf.kvFetcher.GetBytesRead()
}

// GetBatchRequestsIssued returns total number of BatchRequests issued by the
// underlying KVFetcher.
func (rf *Fetcher) GetBatchRequestsIssued() int64 {
if rf == nil || rf.args.WillUseKVProvider {
if rf == nil || rf.kvFetcher == nil || rf.args.WillUseKVProvider {
return 0
}
return rf.kvFetcher.GetBatchRequestsIssued()
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/row/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,9 @@ func TestRowFetcherReset(t *testing.T) {
); err != nil {
t.Fatal(err)
}
// We need to nil out the kvFetchers since these are stored as pointers.
fetcher.kvFetcher = nil
resetFetcher.kvFetcher = nil

if !reflect.DeepEqual(resetFetcher, fetcher) {
t.Fatal("unequal before and after reset", resetFetcher, fetcher)
Expand Down
Loading

0 comments on commit 2c3d75f

Please sign in to comment.