From 173634158d8647d573c8a3f0b7523dc3936c227c Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Fri, 27 Aug 2021 07:18:33 -0500 Subject: [PATCH] row,rowinfra: refactor some types and constants out of row This commit creates a new package called rowinfra and moves some constants and type definitions from the row package to the new package so that they can be used externally without adding a dependency on the row package. Note: I don't really like the name rowinfra, but I think it works given the current name of the row package. I think it would be better to rename row to something that hints at it's actual purpose, which seems to be as the interface between the kv and sql layers. If we rename row, then we can rename rowinfra to match. Release note: None Release justification: Low risk, high benefit change to existing functionality that is needed for a follow-on commit. --- pkg/sql/backfill.go | 3 +- pkg/sql/backfill/backfill.go | 7 +++-- pkg/sql/colfetcher/cfetcher.go | 11 +++---- pkg/sql/colfetcher/colbatch_scan.go | 14 ++++----- pkg/sql/colfetcher/index_join.go | 6 ++-- pkg/sql/indexbackfiller_test.go | 3 +- pkg/sql/row/fetcher.go | 35 +++++----------------- pkg/sql/row/fetcher_test.go | 17 +++++------ pkg/sql/row/kv_batch_fetcher.go | 31 +++++++++----------- pkg/sql/row/kv_fetcher.go | 5 ++-- pkg/sql/rowexec/backfiller.go | 6 ++-- pkg/sql/rowexec/columnbackfiller.go | 4 +-- pkg/sql/rowexec/inverted_joiner.go | 3 +- pkg/sql/rowexec/joinreader.go | 19 ++++++------ pkg/sql/rowexec/rowfetcher.go | 9 +++--- pkg/sql/rowexec/scrub_tablereader.go | 5 ++-- pkg/sql/rowexec/stats.go | 9 +++--- pkg/sql/rowexec/tablereader.go | 17 +++++------ pkg/sql/rowexec/zigzagjoiner.go | 7 +++-- pkg/sql/rowinfra/base.go | 43 ++++++++++++++++++++++++++++ pkg/sql/tablewriter_delete.go | 5 ++-- 21 files changed, 145 insertions(+), 114 deletions(-) create mode 100644 pkg/sql/rowinfra/base.go diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index ee198214421f..129a032351a5 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -2422,7 +2423,7 @@ func columnBackfillInTxn( for sp.Key != nil { var err error sp.Key, err = backfiller.RunColumnBackfillChunk(ctx, - txn, tableDesc, sp, row.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)), + txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)), false /*alsoCommit*/, traceKV) if err != nil { return err diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index ed0b9df17d68..b9cec78eab37 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" @@ -247,7 +248,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( txn *kv.Txn, tableDesc catalog.TableDescriptor, sp roachpb.Span, - chunkSize row.RowLimit, + chunkSize rowinfra.RowLimit, alsoCommit bool, traceKV bool, ) (roachpb.Key, error) { @@ -289,7 +290,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( // populated and deleted by the OLTP commands but not otherwise // read or used if err := cb.fetcher.StartScan( - ctx, txn, []roachpb.Span{sp}, row.DefaultBatchBytesLimit, chunkSize, + ctx, txn, []roachpb.Span{sp}, rowinfra.DefaultBatchBytesLimit, chunkSize, traceKV, false, /* forceProductionKVBatchSize */ ); err != nil { log.Errorf(ctx, "scan error: %s", err) @@ -811,7 +812,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( } defer fetcher.Close(ctx) if err := fetcher.StartScan( - ctx, txn, []roachpb.Span{sp}, row.DefaultBatchBytesLimit, initBufferSize, + ctx, txn, []roachpb.Span{sp}, rowinfra.DefaultBatchBytesLimit, initBufferSize, traceKV, false, /* forceProductionKVBatchSize */ ); err != nil { log.Errorf(ctx, "scan error: %s", err) diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index d95869d77df2..b44a742dd3b0 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -664,15 +665,15 @@ func (rf *cFetcher) StartScan( spans roachpb.Spans, bsHeader *roachpb.BoundedStalenessHeader, limitBatches bool, - batchBytesLimit row.BytesLimit, - limitHint row.RowLimit, + batchBytesLimit rowinfra.BytesLimit, + limitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error { if len(spans) == 0 { return errors.AssertionFailedf("no spans") } - if !limitBatches && batchBytesLimit != row.NoBytesLimit { + if !limitBatches && batchBytesLimit != rowinfra.NoBytesLimit { return errors.AssertionFailedf("batchBytesLimit set without limitBatches") } @@ -681,13 +682,13 @@ func (rf *cFetcher) StartScan( // If we have a limit hint, we limit the first batch size. Subsequent // batches get larger to avoid making things too slow (e.g. in case we have // a very restrictive filter and actually have to retrieve a lot of rows). - firstBatchLimit := row.KeyLimit(limitHint) + firstBatchLimit := rowinfra.KeyLimit(limitHint) if firstBatchLimit != 0 { // The limitHint is a row limit, but each row could be made up // of more than one key. We take the maximum possible keys // per row out of all the table rows we could potentially // scan over. - firstBatchLimit = row.KeyLimit(int(limitHint) * rf.maxKeysPerRow) + firstBatchLimit = rowinfra.KeyLimit(int(limitHint) * rf.maxKeysPerRow) // We need an extra key to make sure we form the last row. firstBatchLimit++ } diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index b71b6d1de9d9..1d6abdffd7ba 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" @@ -52,8 +52,8 @@ type ColBatchScan struct { flowCtx *execinfra.FlowCtx bsHeader *roachpb.BoundedStalenessHeader rf *cFetcher - limitHint row.RowLimit - batchBytesLimit row.BytesLimit + limitHint rowinfra.RowLimit + batchBytesLimit rowinfra.BytesLimit parallelize bool // tracingSpan is created when the stats should be collected for the query // execution, and it will be finished when closing the operator. @@ -194,7 +194,7 @@ func NewColBatchScan( return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set") } - limitHint := row.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) + limitHint := rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) // TODO(ajwerner): The need to construct an immutable here // indicates that we're probably doing this wrong. Instead we should be // just setting the ID and Version in the spec or something like that and @@ -261,11 +261,11 @@ func NewColBatchScan( // just in case. spec.Parallelize = false } - var batchBytesLimit row.BytesLimit + var batchBytesLimit rowinfra.BytesLimit if !spec.Parallelize { - batchBytesLimit = row.BytesLimit(spec.BatchBytesLimit) + batchBytesLimit = rowinfra.BytesLimit(spec.BatchBytesLimit) if batchBytesLimit == 0 { - batchBytesLimit = row.DefaultBatchBytesLimit + batchBytesLimit = rowinfra.DefaultBatchBytesLimit } } diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 5e677400751a..1ca5d235b002 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -26,8 +26,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/memsize" - "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" @@ -172,8 +172,8 @@ func (s *ColIndexJoin) Next() coldata.Batch { spans, nil, /* bsHeader */ false, /* limitBatches */ - row.NoBytesLimit, - row.NoRowLimit, + rowinfra.NoBytesLimit, + rowinfra.NoRowLimit, s.flowCtx.TraceKV, s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index c56e2000a1b5..75bbe45ba4b3 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -33,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -400,7 +401,7 @@ INSERT INTO foo VALUES (1), (10), (100); )) require.NoError(t, fetcher.StartScan( - ctx, txn, spans, row.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */ + ctx, txn, spans, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */ )) var rows []tree.Datums for { diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 4ff8ea0d6411..cfaff2ccfc38 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -563,28 +564,6 @@ func (rf *Fetcher) GetTables() []catalog.Descriptor { return ret } -// RowLimit represents a response limit expressed in terms of number of result -// rows. RowLimits get ultimately converted to KeyLimits and are translated into -// BatchRequest.MaxSpanRequestKeys. -type RowLimit uint64 - -// KeyLimit represents a response limit expressed in terms of number of keys. -type KeyLimit int64 - -// BytesLimit represents a response limit expressed in terms of the size of the -// results. A BytesLimit ultimately translates into BatchRequest.TargetBytes. -type BytesLimit uint64 - -// NoRowLimit can be passed to Fetcher.StartScan to signify that the caller -// doesn't want to limit the number of result rows for each scan request. -const NoRowLimit RowLimit = 0 - -// NoBytesLimit can be passed to Fetcher.StartScan to signify that the caller -// doesn't want to limit the size of results for each scan request. -// -// See also DefaultBatchBytesLimit. -const NoBytesLimit BytesLimit = 0 - // StartScan initializes and starts the key-value scan. Can be used multiple // times. // @@ -606,8 +585,8 @@ func (rf *Fetcher) StartScan( ctx context.Context, txn *kv.Txn, spans roachpb.Spans, - batchBytesLimit BytesLimit, - rowLimitHint RowLimit, + batchBytesLimit rowinfra.BytesLimit, + rowLimitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error { @@ -658,8 +637,8 @@ func (rf *Fetcher) StartInconsistentScan( initialTimestamp hlc.Timestamp, maxTimestampAge time.Duration, spans roachpb.Spans, - batchBytesLimit BytesLimit, - rowLimitHint RowLimit, + batchBytesLimit rowinfra.BytesLimit, + rowLimitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error { @@ -736,7 +715,7 @@ func (rf *Fetcher) StartInconsistentScan( return rf.StartScanFrom(ctx, &f) } -func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint RowLimit) KeyLimit { +func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.KeyLimit { if rowLimitHint == 0 { return 0 } @@ -748,7 +727,7 @@ func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint RowLimit) KeyLimit { // rows we could potentially scan over. // // We add an extra key to make sure we form the last row. - return KeyLimit(int64(rowLimitHint)*int64(rf.maxKeysPerRow) + 1) + return rowinfra.KeyLimit(int64(rowLimitHint)*int64(rf.maxKeysPerRow) + 1) } // StartScanFrom initializes and starts a scan from the given kvBatchFetcher. Can be diff --git a/pkg/sql/row/fetcher_test.go b/pkg/sql/row/fetcher_test.go index 045a04fe66a8..1ec8e0e4d76e 100644 --- a/pkg/sql/row/fetcher_test.go +++ b/pkg/sql/row/fetcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -164,8 +165,8 @@ func TestNextRowSingle(t *testing.T) { context.Background(), kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, - NoBytesLimit, - NoRowLimit, + rowinfra.NoBytesLimit, + rowinfra.NoRowLimit, false, /*traceKV*/ false, /*forceProductionKVBatchSize*/ ); err != nil { @@ -285,7 +286,7 @@ func TestNextRowBatchLimiting(t *testing.T) { context.Background(), kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, - DefaultBatchBytesLimit, + rowinfra.DefaultBatchBytesLimit, 10, /*limitHint*/ false, /*traceKV*/ false, /*forceProductionKVBatchSize*/ @@ -396,8 +397,8 @@ func TestRowFetcherMemoryLimits(t *testing.T) { context.Background(), kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, - NoBytesLimit, - NoRowLimit, + rowinfra.NoBytesLimit, + rowinfra.NoRowLimit, false, /*traceKV*/ false, /*forceProductionKVBatchSize*/ ) @@ -482,7 +483,7 @@ INDEX(c) roachpb.Spans{indexSpan, roachpb.Span{Key: midKey, EndKey: endKey}, }, - DefaultBatchBytesLimit, + rowinfra.DefaultBatchBytesLimit, // Set a limitHint of 1 to more quickly end the first batch, causing a // batch that ends between rows. 1, /*limitHint*/ @@ -645,8 +646,8 @@ func TestNextRowSecondaryIndex(t *testing.T) { context.Background(), kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.PublicNonPrimaryIndexes()[0].GetID())}, - NoBytesLimit, - NoRowLimit, + rowinfra.NoBytesLimit, + rowinfra.NoRowLimit, false, /*traceKV*/ false, /*forceProductionKVBatchSize*/ ); err != nil { diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 648254129fe8..9605fdf6ba0e 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -36,25 +37,19 @@ import ( // only be used by tests the output of which differs if defaultKVBatchSize is // randomized. // TODO(radu): parameters like this should be configurable -func getKVBatchSize(forceProductionKVBatchSize bool) KeyLimit { +func getKVBatchSize(forceProductionKVBatchSize bool) rowinfra.KeyLimit { if forceProductionKVBatchSize { - return productionKVBatchSize + return rowinfra.ProductionKVBatchSize } return defaultKVBatchSize } -var defaultKVBatchSize = KeyLimit(util.ConstantWithMetamorphicTestValue( +var defaultKVBatchSize = rowinfra.KeyLimit(util.ConstantWithMetamorphicTestValue( "kv-batch-size", - int(productionKVBatchSize), /* defaultValue */ - 1, /* metamorphicValue */ + int(rowinfra.ProductionKVBatchSize), /* defaultValue */ + 1, /* metamorphicValue */ )) -const productionKVBatchSize KeyLimit = 100000 - -// DefaultBatchBytesLimit is the maximum number of bytes a scan request can -// return. -const DefaultBatchBytesLimit BytesLimit = 10 << 20 // 10 MB - // sendFunc is the function used to execute a KV batch; normally // wraps (*client.Txn).Send. type sendFunc func( @@ -74,7 +69,7 @@ type txnKVFetcher struct { // to this value and subsequent batches are larger (up to a limit, see // getKVBatchSize()). If not set, batches do not have a key limit (they might // still have a bytes limit as per batchBytesLimit). - firstBatchKeyLimit KeyLimit + firstBatchKeyLimit rowinfra.KeyLimit // If batchBytesLimit is set, the batches are limited in response size. This // protects from OOMs, but comes at the cost of inhibiting DistSender-level // parallelism within a batch. @@ -83,7 +78,7 @@ type txnKVFetcher struct { // there is only a "small" amount of data to be read (i.e. scanning `spans` // doesn't result in too much data), and wants to preserve concurrency for // this scans inside of DistSender. - batchBytesLimit BytesLimit + batchBytesLimit rowinfra.BytesLimit reverse bool // lockStrength represents the locking mode to use when fetching KVs. @@ -122,11 +117,11 @@ var _ kvBatchFetcher = &txnKVFetcher{} // getBatchKeyLimit returns the max size of the next batch. The size is // expressed in number of result keys (i.e. this size will be used for // MaxSpanRequestKeys). -func (f *txnKVFetcher) getBatchKeyLimit() KeyLimit { +func (f *txnKVFetcher) getBatchKeyLimit() rowinfra.KeyLimit { return f.getBatchKeyLimitForIdx(f.batchIdx) } -func (f *txnKVFetcher) getBatchKeyLimitForIdx(batchIdx int) KeyLimit { +func (f *txnKVFetcher) getBatchKeyLimitForIdx(batchIdx int) rowinfra.KeyLimit { if f.firstBatchKeyLimit == 0 { return 0 } @@ -232,15 +227,15 @@ func makeKVBatchFetcherDefaultSendFunc(txn *kv.Txn) sendFunc { // makeKVBatchFetcher initializes a kvBatchFetcher for the given spans. If // useBatchLimit is true, the number of result keys per batch is limited; the // limit grows between subsequent batches, starting at firstBatchKeyLimit (if not -// 0) to productionKVBatchSize. +// 0) to ProductionKVBatchSize. // // Batch limits can only be used if the spans are ordered. func makeKVBatchFetcher( sendFn sendFunc, spans roachpb.Spans, reverse bool, - batchBytesLimit BytesLimit, - firstBatchKeyLimit KeyLimit, + batchBytesLimit rowinfra.BytesLimit, + firstBatchKeyLimit rowinfra.KeyLimit, lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, lockTimeout time.Duration, diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 40ae4acb05ec..57d99b9cb0fe 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -49,8 +50,8 @@ func NewKVFetcher( spans roachpb.Spans, bsHeader *roachpb.BoundedStalenessHeader, reverse bool, - batchBytesLimit BytesLimit, - firstBatchLimit KeyLimit, + batchBytesLimit rowinfra.BytesLimit, + firstBatchLimit rowinfra.KeyLimit, lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, lockTimeout time.Duration, diff --git a/pkg/sql/rowexec/backfiller.go b/pkg/sql/rowexec/backfiller.go index 5d9027eb3aab..c1937e558181 100644 --- a/pkg/sql/rowexec/backfiller.go +++ b/pkg/sql/rowexec/backfiller.go @@ -24,7 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -46,7 +46,7 @@ type chunkBackfiller interface { runChunk( ctx context.Context, span roachpb.Span, - chunkSize row.RowLimit, + chunkSize rowinfra.RowLimit, readAsOf hlc.Timestamp, ) (roachpb.Key, error) @@ -148,7 +148,7 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) { for todo.Key != nil { log.VEventf(ctx, 3, "%s backfiller starting chunk %d: %s", b.name, chunks, todo) var err error - todo.Key, err = b.chunks.runChunk(ctx, todo, row.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf) + todo.Key, err = b.chunks.runChunk(ctx, todo, rowinfra.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf) if err != nil { return nil, err } diff --git a/pkg/sql/rowexec/columnbackfiller.go b/pkg/sql/rowexec/columnbackfiller.go index 7da002729454..c1c105b4c285 100644 --- a/pkg/sql/rowexec/columnbackfiller.go +++ b/pkg/sql/rowexec/columnbackfiller.go @@ -20,7 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -100,7 +100,7 @@ func (cb *columnBackfiller) CurrentBufferFill() float32 { // runChunk implements the chunkBackfiller interface. func (cb *columnBackfiller) runChunk( - ctx context.Context, sp roachpb.Span, chunkSize row.RowLimit, _ hlc.Timestamp, + ctx context.Context, sp roachpb.Span, chunkSize rowinfra.RowLimit, _ hlc.Timestamp, ) (roachpb.Key, error) { var key roachpb.Key var commitWaitFn func(context.Context) error diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index abe35f1f7bbe..fea35ca15385 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" @@ -493,7 +494,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce log.VEventf(ij.Ctx, 1, "scanning %d spans", len(indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.FlowCtx.Txn, indexSpans, row.NoBytesLimit, row.NoRowLimit, + ij.Ctx, ij.FlowCtx.Txn, indexSpans, rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ij.FlowCtx.TraceKV, ij.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { ij.MoveToDraining(err) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 78285820e4cb..2aa8884913f6 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowcontainer" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" @@ -176,7 +177,7 @@ type joinReader struct { // lookupBatchBytesLimit controls the TargetBytes of lookup requests. If 0, a // default will be used. Regardless of this value, bytes limits aren't always // used. - lookupBatchBytesLimit row.BytesLimit + lookupBatchBytesLimit rowinfra.BytesLimit } var _ execinfra.Processor = &joinReader{} @@ -253,7 +254,7 @@ func newJoinReader( // cases, we use limits. shouldLimitBatches: !spec.LookupColumnsAreKey && readerType == lookupJoinReaderType, readerType: readerType, - lookupBatchBytesLimit: row.BytesLimit(spec.LookupBatchBytesLimit), + lookupBatchBytesLimit: rowinfra.BytesLimit(spec.LookupBatchBytesLimit), } if readerType != indexJoinReaderType { jr.groupingState = &inputBatchGroupingState{doGrouping: spec.LeftJoinWithPairedJoiner} @@ -748,17 +749,17 @@ func (jr *joinReader) readInput() ( } log.VEventf(jr.Ctx, 1, "scanning %d spans", len(spans)) - var bytesLimit row.BytesLimit + var bytesLimit rowinfra.BytesLimit if !jr.shouldLimitBatches { - bytesLimit = row.NoBytesLimit + bytesLimit = rowinfra.NoBytesLimit } else { bytesLimit = jr.lookupBatchBytesLimit if jr.lookupBatchBytesLimit == 0 { - bytesLimit = row.DefaultBatchBytesLimit + bytesLimit = rowinfra.DefaultBatchBytesLimit } } if err := jr.fetcher.StartScan( - jr.Ctx, jr.FlowCtx.Txn, spans, bytesLimit, row.NoRowLimit, + jr.Ctx, jr.FlowCtx.Txn, spans, bytesLimit, rowinfra.NoRowLimit, jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { jr.MoveToDraining(err) @@ -827,12 +828,12 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet sort.Sort(spans) log.VEventf(jr.Ctx, 1, "scanning %d remote spans", len(spans)) - bytesLimit := row.DefaultBatchBytesLimit + bytesLimit := rowinfra.DefaultBatchBytesLimit if !jr.shouldLimitBatches { - bytesLimit = row.NoBytesLimit + bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, jr.FlowCtx.Txn, spans, bytesLimit, row.NoRowLimit, + jr.Ctx, jr.FlowCtx.Txn, spans, bytesLimit, rowinfra.NoRowLimit, jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { jr.MoveToDraining(err) diff --git a/pkg/sql/rowexec/rowfetcher.go b/pkg/sql/rowexec/rowfetcher.go index fd87efa52130..12532c289bd5 100644 --- a/pkg/sql/rowexec/rowfetcher.go +++ b/pkg/sql/rowexec/rowfetcher.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -32,8 +33,8 @@ import ( // collector wrapper can be plugged in. type rowFetcher interface { StartScan( - _ context.Context, _ *kv.Txn, _ roachpb.Spans, batchBytesLimit row.BytesLimit, - rowLimitHint row.RowLimit, traceKV bool, forceProductionKVBatchSize bool, + _ context.Context, _ *kv.Txn, _ roachpb.Spans, batchBytesLimit rowinfra.BytesLimit, + rowLimitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error StartInconsistentScan( _ context.Context, @@ -41,8 +42,8 @@ type rowFetcher interface { initialTimestamp hlc.Timestamp, maxTimestampAge time.Duration, spans roachpb.Spans, - batchBytesLimit row.BytesLimit, - rowLimitHint row.RowLimit, + batchBytesLimit rowinfra.BytesLimit, + rowLimitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error diff --git a/pkg/sql/rowexec/scrub_tablereader.go b/pkg/sql/rowexec/scrub_tablereader.go index 8a22dc172307..9a1b2f6134d7 100644 --- a/pkg/sql/rowexec/scrub_tablereader.go +++ b/pkg/sql/rowexec/scrub_tablereader.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" @@ -79,7 +80,7 @@ func newScrubTableReader( } tr.tableDesc = spec.BuildTableDescriptor() - tr.limitHint = row.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) + tr.limitHint = rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) if err := tr.Init( tr, @@ -218,7 +219,7 @@ func (tr *scrubTableReader) Start(ctx context.Context) { log.VEventf(ctx, 1, "starting") if err := tr.fetcher.StartScan( - ctx, tr.FlowCtx.Txn, tr.spans, row.DefaultBatchBytesLimit, tr.limitHint, + ctx, tr.FlowCtx.Txn, tr.spans, rowinfra.DefaultBatchBytesLimit, tr.limitHint, tr.FlowCtx.TraceKV, tr.EvalCtx.TestingKnobs.ForceProductionBatchSizes, ); err != nil { tr.MoveToDraining(err) diff --git a/pkg/sql/rowexec/stats.go b/pkg/sql/rowexec/stats.go index 9d625e2c5868..568873592f75 100644 --- a/pkg/sql/rowexec/stats.go +++ b/pkg/sql/rowexec/stats.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -108,8 +109,8 @@ func (c *rowFetcherStatCollector) StartScan( ctx context.Context, txn *kv.Txn, spans roachpb.Spans, - batchBytesLimit row.BytesLimit, - limitHint row.RowLimit, + batchBytesLimit rowinfra.BytesLimit, + limitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error { @@ -126,8 +127,8 @@ func (c *rowFetcherStatCollector) StartInconsistentScan( initialTimestamp hlc.Timestamp, maxTimestampAge time.Duration, spans roachpb.Spans, - batchBytesLimit row.BytesLimit, - limitHint row.RowLimit, + batchBytesLimit rowinfra.BytesLimit, + limitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, ) error { diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index e205a7d33dca..9b8a52c64175 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/errors" @@ -35,9 +36,9 @@ type tableReader struct { execinfra.ProcessorBase spans roachpb.Spans - limitHint row.RowLimit + limitHint rowinfra.RowLimit parallelize bool - batchBytesLimit row.BytesLimit + batchBytesLimit rowinfra.BytesLimit scanStarted bool @@ -86,17 +87,17 @@ func newTableReader( // just in case. spec.Parallelize = false } - var batchBytesLimit row.BytesLimit + var batchBytesLimit rowinfra.BytesLimit if !spec.Parallelize { - batchBytesLimit = row.BytesLimit(spec.BatchBytesLimit) + batchBytesLimit = rowinfra.BytesLimit(spec.BatchBytesLimit) if batchBytesLimit == 0 { - batchBytesLimit = row.DefaultBatchBytesLimit + batchBytesLimit = rowinfra.DefaultBatchBytesLimit } } tr := trPool.Get().(*tableReader) - tr.limitHint = row.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) + tr.limitHint = rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) tr.parallelize = spec.Parallelize tr.batchBytesLimit = batchBytesLimit tr.maxTimestampAge = time.Duration(spec.MaxTimestampAgeNanos) @@ -204,9 +205,9 @@ func (tr *tableReader) Start(ctx context.Context) { func (tr *tableReader) startScan(ctx context.Context) error { limitBatches := !tr.parallelize - var bytesLimit row.BytesLimit + var bytesLimit rowinfra.BytesLimit if !limitBatches { - bytesLimit = row.NoBytesLimit + bytesLimit = rowinfra.NoBytesLimit } else { bytesLimit = tr.batchBytesLimit } diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index e6465f54cb92..8a599f2bc23d 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/scrub" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" @@ -252,7 +253,7 @@ type zigzagJoiner struct { // be fetched at a time. Increasing this will improve performance for when // matched rows are grouped together, but increasing this too much will result // in fetching too many rows and therefore skipping less rows. -var zigzagJoinerBatchSize = row.RowLimit(util.ConstantWithMetamorphicTestValue( +var zigzagJoinerBatchSize = rowinfra.RowLimit(util.ConstantWithMetamorphicTestValue( "zig-zag-joiner-batch-size", 5, /* defaultValue */ 1, /* metamorphicValue */ @@ -793,7 +794,7 @@ func (z *zigzagJoiner) nextRow(ctx context.Context, txn *kv.Txn) (rowenc.EncDatu ctx, txn, roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, - row.DefaultBatchBytesLimit, + rowinfra.DefaultBatchBytesLimit, zigzagJoinerBatchSize, z.FlowCtx.TraceKV, z.EvalCtx.TestingKnobs.ForceProductionBatchSizes, @@ -938,7 +939,7 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { z.Ctx, z.FlowCtx.Txn, roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, - row.DefaultBatchBytesLimit, + rowinfra.DefaultBatchBytesLimit, zigzagJoinerBatchSize, z.FlowCtx.TraceKV, z.EvalCtx.TestingKnobs.ForceProductionBatchSizes, diff --git a/pkg/sql/rowinfra/base.go b/pkg/sql/rowinfra/base.go new file mode 100644 index 000000000000..d8a41d12e615 --- /dev/null +++ b/pkg/sql/rowinfra/base.go @@ -0,0 +1,43 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package rowinfra contains constants and types used by the row package +// that must also be accessible from other packages. +package rowinfra + +// RowLimit represents a response limit expressed in terms of number of result +// rows. RowLimits get ultimately converted to KeyLimits and are translated into +// BatchRequest.MaxSpanRequestKeys. +type RowLimit uint64 + +// KeyLimit represents a response limit expressed in terms of number of keys. +type KeyLimit int64 + +// BytesLimit represents a response limit expressed in terms of the size of the +// results. A BytesLimit ultimately translates into BatchRequest.TargetBytes. +type BytesLimit uint64 + +// NoRowLimit can be passed to Fetcher.StartScan to signify that the caller +// doesn't want to limit the number of result rows for each scan request. +const NoRowLimit RowLimit = 0 + +// NoBytesLimit can be passed to Fetcher.StartScan to signify that the caller +// doesn't want to limit the size of results for each scan request. +// +// See also DefaultBatchBytesLimit. +const NoBytesLimit BytesLimit = 0 + +// ProductionKVBatchSize is the kv batch size to use for production (i.e., +// non-test) clusters. +const ProductionKVBatchSize KeyLimit = 100000 + +// DefaultBatchBytesLimit is the maximum number of bytes a scan request can +// return. +const DefaultBatchBytesLimit BytesLimit = 10 << 20 // 10 MB diff --git a/pkg/sql/tablewriter_delete.go b/pkg/sql/tablewriter_delete.go index 57f11e056116..5ad2f0ea6885 100644 --- a/pkg/sql/tablewriter_delete.go +++ b/pkg/sql/tablewriter_delete.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -147,7 +148,7 @@ func (td *tableDeleter) deleteAllRowsScan( return resume, err } if err := rf.StartScan( - ctx, td.txn, roachpb.Spans{resume}, row.DefaultBatchBytesLimit, row.NoRowLimit, traceKV, td.forceProductionBatchSizes, + ctx, td.txn, roachpb.Spans{resume}, rowinfra.DefaultBatchBytesLimit, rowinfra.NoRowLimit, traceKV, td.forceProductionBatchSizes, ); err != nil { return resume, err } @@ -284,7 +285,7 @@ func (td *tableDeleter) deleteIndexScan( return resume, err } if err := rf.StartScan( - ctx, td.txn, roachpb.Spans{resume}, row.DefaultBatchBytesLimit, row.NoRowLimit, traceKV, td.forceProductionBatchSizes, + ctx, td.txn, roachpb.Spans{resume}, rowinfra.DefaultBatchBytesLimit, rowinfra.NoRowLimit, traceKV, td.forceProductionBatchSizes, ); err != nil { return resume, err }