Skip to content

Commit

Permalink
row,rowinfra: refactor some types and constants out of row
Browse files Browse the repository at this point in the history
This commit creates a new package called rowinfra and moves some
constants and type definitions from the row package to the new
package so that they can be used externally without adding a
dependency on the row package.

Note: I don't really like the name rowinfra, but I think it works
given the current name of the row package. I think it would be better
to rename row to something that hints at it's actual purpose, which
seems to be as the interface between the kv and sql layers. If we
rename row, then we can rename rowinfra to match.

Release note: None

Release justification: Low risk, high benefit change to existing
functionality that is needed for a follow-on commit.
  • Loading branch information
rytaft committed Aug 27, 2021
1 parent 36d95e4 commit 1736341
Show file tree
Hide file tree
Showing 21 changed files with 145 additions and 114 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -2422,7 +2423,7 @@ func columnBackfillInTxn(
for sp.Key != nil {
var err error
sp.Key, err = backfiller.RunColumnBackfillChunk(ctx,
txn, tableDesc, sp, row.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/transform"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
Expand Down Expand Up @@ -247,7 +248,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
txn *kv.Txn,
tableDesc catalog.TableDescriptor,
sp roachpb.Span,
chunkSize row.RowLimit,
chunkSize rowinfra.RowLimit,
alsoCommit bool,
traceKV bool,
) (roachpb.Key, error) {
Expand Down Expand Up @@ -289,7 +290,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
// populated and deleted by the OLTP commands but not otherwise
// read or used
if err := cb.fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, row.DefaultBatchBytesLimit, chunkSize,
ctx, txn, []roachpb.Span{sp}, rowinfra.DefaultBatchBytesLimit, chunkSize,
traceKV, false, /* forceProductionKVBatchSize */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
Expand Down Expand Up @@ -811,7 +812,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
}
defer fetcher.Close(ctx)
if err := fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, row.DefaultBatchBytesLimit, initBufferSize,
ctx, txn, []roachpb.Span{sp}, rowinfra.DefaultBatchBytesLimit, initBufferSize,
traceKV, false, /* forceProductionKVBatchSize */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/scrub"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -664,15 +665,15 @@ func (rf *cFetcher) StartScan(
spans roachpb.Spans,
bsHeader *roachpb.BoundedStalenessHeader,
limitBatches bool,
batchBytesLimit row.BytesLimit,
limitHint row.RowLimit,
batchBytesLimit rowinfra.BytesLimit,
limitHint rowinfra.RowLimit,
traceKV bool,
forceProductionKVBatchSize bool,
) error {
if len(spans) == 0 {
return errors.AssertionFailedf("no spans")
}
if !limitBatches && batchBytesLimit != row.NoBytesLimit {
if !limitBatches && batchBytesLimit != rowinfra.NoBytesLimit {
return errors.AssertionFailedf("batchBytesLimit set without limitBatches")
}

Expand All @@ -681,13 +682,13 @@ func (rf *cFetcher) StartScan(
// If we have a limit hint, we limit the first batch size. Subsequent
// batches get larger to avoid making things too slow (e.g. in case we have
// a very restrictive filter and actually have to retrieve a lot of rows).
firstBatchLimit := row.KeyLimit(limitHint)
firstBatchLimit := rowinfra.KeyLimit(limitHint)
if firstBatchLimit != 0 {
// The limitHint is a row limit, but each row could be made up
// of more than one key. We take the maximum possible keys
// per row out of all the table rows we could potentially
// scan over.
firstBatchLimit = row.KeyLimit(int(limitHint) * rf.maxKeysPerRow)
firstBatchLimit = rowinfra.KeyLimit(int(limitHint) * rf.maxKeysPerRow)
// We need an extra key to make sure we form the last row.
firstBatchLimit++
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -52,8 +52,8 @@ type ColBatchScan struct {
flowCtx *execinfra.FlowCtx
bsHeader *roachpb.BoundedStalenessHeader
rf *cFetcher
limitHint row.RowLimit
batchBytesLimit row.BytesLimit
limitHint rowinfra.RowLimit
batchBytesLimit rowinfra.BytesLimit
parallelize bool
// tracingSpan is created when the stats should be collected for the query
// execution, and it will be finished when closing the operator.
Expand Down Expand Up @@ -194,7 +194,7 @@ func NewColBatchScan(
return nil, errors.AssertionFailedf("attempting to create a cFetcher with the IsCheck flag set")
}

limitHint := row.RowLimit(execinfra.LimitHint(spec.LimitHint, post))
limitHint := rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post))
// TODO(ajwerner): The need to construct an immutable here
// indicates that we're probably doing this wrong. Instead we should be
// just setting the ID and Version in the spec or something like that and
Expand Down Expand Up @@ -261,11 +261,11 @@ func NewColBatchScan(
// just in case.
spec.Parallelize = false
}
var batchBytesLimit row.BytesLimit
var batchBytesLimit rowinfra.BytesLimit
if !spec.Parallelize {
batchBytesLimit = row.BytesLimit(spec.BatchBytesLimit)
batchBytesLimit = rowinfra.BytesLimit(spec.BatchBytesLimit)
if batchBytesLimit == 0 {
batchBytesLimit = row.DefaultBatchBytesLimit
batchBytesLimit = rowinfra.DefaultBatchBytesLimit
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
Expand Down Expand Up @@ -172,8 +172,8 @@ func (s *ColIndexJoin) Next() coldata.Batch {
spans,
nil, /* bsHeader */
false, /* limitBatches */
row.NoBytesLimit,
row.NoRowLimit,
rowinfra.NoBytesLimit,
rowinfra.NoRowLimit,
s.flowCtx.TraceKV,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionBatchSizes,
); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/indexbackfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -400,7 +401,7 @@ INSERT INTO foo VALUES (1), (10), (100);
))

require.NoError(t, fetcher.StartScan(
ctx, txn, spans, row.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */
ctx, txn, spans, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */
))
var rows []tree.Datums
for {
Expand Down
35 changes: 7 additions & 28 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/scrub"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -563,28 +564,6 @@ func (rf *Fetcher) GetTables() []catalog.Descriptor {
return ret
}

// RowLimit represents a response limit expressed in terms of number of result
// rows. RowLimits get ultimately converted to KeyLimits and are translated into
// BatchRequest.MaxSpanRequestKeys.
type RowLimit uint64

// KeyLimit represents a response limit expressed in terms of number of keys.
type KeyLimit int64

// BytesLimit represents a response limit expressed in terms of the size of the
// results. A BytesLimit ultimately translates into BatchRequest.TargetBytes.
type BytesLimit uint64

// NoRowLimit can be passed to Fetcher.StartScan to signify that the caller
// doesn't want to limit the number of result rows for each scan request.
const NoRowLimit RowLimit = 0

// NoBytesLimit can be passed to Fetcher.StartScan to signify that the caller
// doesn't want to limit the size of results for each scan request.
//
// See also DefaultBatchBytesLimit.
const NoBytesLimit BytesLimit = 0

// StartScan initializes and starts the key-value scan. Can be used multiple
// times.
//
Expand All @@ -606,8 +585,8 @@ func (rf *Fetcher) StartScan(
ctx context.Context,
txn *kv.Txn,
spans roachpb.Spans,
batchBytesLimit BytesLimit,
rowLimitHint RowLimit,
batchBytesLimit rowinfra.BytesLimit,
rowLimitHint rowinfra.RowLimit,
traceKV bool,
forceProductionKVBatchSize bool,
) error {
Expand Down Expand Up @@ -658,8 +637,8 @@ func (rf *Fetcher) StartInconsistentScan(
initialTimestamp hlc.Timestamp,
maxTimestampAge time.Duration,
spans roachpb.Spans,
batchBytesLimit BytesLimit,
rowLimitHint RowLimit,
batchBytesLimit rowinfra.BytesLimit,
rowLimitHint rowinfra.RowLimit,
traceKV bool,
forceProductionKVBatchSize bool,
) error {
Expand Down Expand Up @@ -736,7 +715,7 @@ func (rf *Fetcher) StartInconsistentScan(
return rf.StartScanFrom(ctx, &f)
}

func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint RowLimit) KeyLimit {
func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.KeyLimit {
if rowLimitHint == 0 {
return 0
}
Expand All @@ -748,7 +727,7 @@ func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint RowLimit) KeyLimit {
// rows we could potentially scan over.
//
// We add an extra key to make sure we form the last row.
return KeyLimit(int64(rowLimitHint)*int64(rf.maxKeysPerRow) + 1)
return rowinfra.KeyLimit(int64(rowLimitHint)*int64(rf.maxKeysPerRow) + 1)
}

// StartScanFrom initializes and starts a scan from the given kvBatchFetcher. Can be
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/row/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -164,8 +165,8 @@ func TestNextRowSingle(t *testing.T) {
context.Background(),
kv.NewTxn(ctx, kvDB, 0),
roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())},
NoBytesLimit,
NoRowLimit,
rowinfra.NoBytesLimit,
rowinfra.NoRowLimit,
false, /*traceKV*/
false, /*forceProductionKVBatchSize*/
); err != nil {
Expand Down Expand Up @@ -285,7 +286,7 @@ func TestNextRowBatchLimiting(t *testing.T) {
context.Background(),
kv.NewTxn(ctx, kvDB, 0),
roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())},
DefaultBatchBytesLimit,
rowinfra.DefaultBatchBytesLimit,
10, /*limitHint*/
false, /*traceKV*/
false, /*forceProductionKVBatchSize*/
Expand Down Expand Up @@ -396,8 +397,8 @@ func TestRowFetcherMemoryLimits(t *testing.T) {
context.Background(),
kv.NewTxn(ctx, kvDB, 0),
roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())},
NoBytesLimit,
NoRowLimit,
rowinfra.NoBytesLimit,
rowinfra.NoRowLimit,
false, /*traceKV*/
false, /*forceProductionKVBatchSize*/
)
Expand Down Expand Up @@ -482,7 +483,7 @@ INDEX(c)
roachpb.Spans{indexSpan,
roachpb.Span{Key: midKey, EndKey: endKey},
},
DefaultBatchBytesLimit,
rowinfra.DefaultBatchBytesLimit,
// Set a limitHint of 1 to more quickly end the first batch, causing a
// batch that ends between rows.
1, /*limitHint*/
Expand Down Expand Up @@ -645,8 +646,8 @@ func TestNextRowSecondaryIndex(t *testing.T) {
context.Background(),
kv.NewTxn(ctx, kvDB, 0),
roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.PublicNonPrimaryIndexes()[0].GetID())},
NoBytesLimit,
NoRowLimit,
rowinfra.NoBytesLimit,
rowinfra.NoRowLimit,
false, /*traceKV*/
false, /*forceProductionKVBatchSize*/
); err != nil {
Expand Down
Loading

0 comments on commit 1736341

Please sign in to comment.