Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
78539: sql: clean up some long argument lists in fetchers r=jordanlewis a=jordanlewis

This PR is a pure refactor that replaces some overly-long argument lists with argument structs.

Co-authored-by: Jordan Lewis <[email protected]>
  • Loading branch information
craig[bot] and jordanlewis committed Apr 19, 2022
2 parents 1fdbb16 + 6cab233 commit 9246ff5
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 227 deletions.
11 changes: 4 additions & 7 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,10 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily(

if err := rf.Init(
context.TODO(),
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
&c.a,
nil, /* memMonitor */
&spec,
row.FetcherInitArgs{
Alloc: &c.a,
Spec: &spec,
},
); err != nil {
return nil, err
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,16 +619,11 @@ func makeRowFetcher(
}

var rf row.Fetcher
if err := rf.Init(
ctx,
false, /*reverse*/
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
&tree.DatumAlloc{},
nil, /*mon.BytesMonitor*/
&spec,
); err != nil {
if err := rf.Init(ctx,
row.FetcherInitArgs{
Alloc: &tree.DatumAlloc{},
Spec: &spec,
}); err != nil {
return rf, err
}
return rf, nil
Expand Down
24 changes: 10 additions & 14 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,11 @@ func (cb *ColumnBackfiller) init(

return cb.fetcher.Init(
evalCtx.Context,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
&cb.alloc,
cb.mon,
&spec,
row.FetcherInitArgs{
Alloc: &cb.alloc,
MemMonitor: cb.mon,
Spec: &spec,
},
)
}

Expand Down Expand Up @@ -848,13 +846,11 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
var fetcher row.Fetcher
if err := fetcher.Init(
ib.evalCtx.Context,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
&ib.alloc,
ib.mon,
&spec,
row.FetcherInitArgs{
Alloc: &ib.alloc,
MemMonitor: ib.mon,
Spec: &spec,
},
); err != nil {
return nil, nil, 0, err
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/sql/delete_preserving_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,13 +776,12 @@ func fetchIndex(
const reverse = false
require.NoError(t, fetcher.Init(
ctx,
reverse,
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0,
&alloc,
mm.Monitor(),
&spec,
row.FetcherInitArgs{
Reverse: reverse,
Alloc: &alloc,
MemMonitor: mm.Monitor(),
Spec: &spec,
},
))

require.NoError(t, fetcher.StartScan(
Expand Down
11 changes: 4 additions & 7 deletions pkg/sql/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,10 @@ func (d *deleteRangeNode) startExec(params runParams) error {
}
if err := d.fetcher.Init(
params.ctx,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
params.p.alloc,
nil, /* memMonitor */
&spec,
row.FetcherInitArgs{
Alloc: params.p.alloc,
Spec: &spec,
},
); err != nil {
return err
}
Expand Down
13 changes: 5 additions & 8 deletions pkg/sql/indexbackfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ INSERT INTO foo VALUES (1), (10), (100);

require.NoError(t, err)
spans := []roachpb.Span{table.IndexSpan(keys.SystemSQLCodec, indexID)}
const reverse = false
var fetcherCols []descpb.ColumnID
for _, col := range table.PublicColumns() {
if colIDsNeeded.Contains(col.GetID()) {
Expand All @@ -409,13 +408,11 @@ INSERT INTO foo VALUES (1), (10), (100);
var fetcher row.Fetcher
require.NoError(t, fetcher.Init(
ctx,
reverse,
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0,
&alloc,
mm.Monitor(),
&spec,
row.FetcherInitArgs{
Alloc: &alloc,
MemMonitor: mm.Monitor(),
Spec: &spec,
},
))

require.NoError(t, fetcher.StartScan(
Expand Down
11 changes: 4 additions & 7 deletions pkg/sql/row/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,10 @@ func DecodeRowInfo(
rf.IgnoreUnexpectedNulls = true
if err := rf.Init(
ctx,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
&tree.DatumAlloc{},
nil, /* memMonitor */
&spec,
FetcherInitArgs{
Alloc: &tree.DatumAlloc{},
Spec: &spec,
},
); err != nil {
return nil, nil, nil, err
}
Expand Down
124 changes: 64 additions & 60 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,40 +203,40 @@ func (rf *Fetcher) Close(ctx context.Context) {
}
}

// Init sets up a Fetcher for a given table and index. If we are using a
// non-primary index, tables.ValNeededForCol can only refer to columns in the
// index.
func (rf *Fetcher) Init(
ctx context.Context,
reverse bool,
lockStrength descpb.ScanLockingStrength,
lockWaitPolicy descpb.ScanLockingWaitPolicy,
lockTimeout time.Duration,
alloc *tree.DatumAlloc,
memMonitor *mon.BytesMonitor,
spec *descpb.IndexFetchSpec,
) error {
if spec.Version != descpb.IndexFetchSpecVersionInitial {
return errors.Newf("unsupported IndexFetchSpec version %d", spec.Version)
// FetcherInitArgs contains arguments for Fetcher.Init.
type FetcherInitArgs struct {
Reverse bool
LockStrength descpb.ScanLockingStrength
LockWaitPolicy descpb.ScanLockingWaitPolicy
LockTimeout time.Duration
Alloc *tree.DatumAlloc
MemMonitor *mon.BytesMonitor
Spec *descpb.IndexFetchSpec
}

// Init sets up a Fetcher for a given table and index.
func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
if args.Spec.Version != descpb.IndexFetchSpecVersionInitial {
return errors.Newf("unsupported IndexFetchSpec version %d", args.Spec.Version)
}
rf.reverse = reverse
rf.lockStrength = lockStrength
rf.lockWaitPolicy = lockWaitPolicy
rf.lockTimeout = lockTimeout
rf.alloc = alloc

if memMonitor != nil {
rf.mon = mon.NewMonitorInheritWithLimit("fetcher-mem", 0 /* limit */, memMonitor)
rf.mon.Start(ctx, memMonitor, mon.BoundAccount{})
rf.reverse = args.Reverse
rf.lockStrength = args.LockStrength
rf.lockWaitPolicy = args.LockWaitPolicy
rf.lockTimeout = args.LockTimeout
rf.alloc = args.Alloc

if args.MemMonitor != nil {
rf.mon = mon.NewMonitorInheritWithLimit("fetcher-mem", 0 /* limit */, args.MemMonitor)
rf.mon.Start(ctx, args.MemMonitor, mon.BoundAccount{})
memAcc := rf.mon.MakeBoundAccount()
rf.kvFetcherMemAcc = &memAcc
}

table := &rf.table
*table = tableInfo{
spec: *spec,
row: make(rowenc.EncDatumRow, len(spec.FetchedColumns)),
decodedRow: make(tree.Datums, len(spec.FetchedColumns)),
spec: *args.Spec,
row: make(rowenc.EncDatumRow, len(args.Spec.FetchedColumns)),
decodedRow: make(tree.Datums, len(args.Spec.FetchedColumns)),

// These slice fields might get re-allocated below, so reslice them from
// the old table here in case they've got enough capacity already.
Expand All @@ -247,8 +247,8 @@ func (rf *Fetcher) Init(
oidOutputIdx: noOutputColumn,
}

for idx := range spec.FetchedColumns {
colID := spec.FetchedColumns[idx].ColumnID
for idx := range args.Spec.FetchedColumns {
colID := args.Spec.FetchedColumns[idx].ColumnID
table.colIdxMap.Set(colID, idx)
if colinfo.IsColIDSystemColumn(colID) {
switch colinfo.GetSystemColumnKindFromColumnID(colID) {
Expand All @@ -258,13 +258,13 @@ func (rf *Fetcher) Init(

case catpb.SystemColumnKind_TABLEOID:
table.oidOutputIdx = idx
table.tableOid = tree.NewDOid(tree.DInt(spec.TableID))
table.tableOid = tree.NewDOid(tree.DInt(args.Spec.TableID))
}
}
}

if len(spec.FetchedColumns) > 0 {
table.neededValueColsByIdx.AddRange(0, len(spec.FetchedColumns)-1)
if len(args.Spec.FetchedColumns) > 0 {
table.neededValueColsByIdx.AddRange(0, len(args.Spec.FetchedColumns)-1)
}

nExtraCols := 0
Expand All @@ -273,7 +273,7 @@ func (rf *Fetcher) Init(
if table.spec.IsSecondaryIndex && table.spec.IsUniqueIndex {
nExtraCols = int(table.spec.NumKeySuffixColumns)
}
nIndexCols := len(spec.KeyAndSuffixColumns) - nExtraCols
nIndexCols := len(args.Spec.KeyAndSuffixColumns) - nExtraCols

neededIndexCols := 0
compositeIndexCols := 0
Expand All @@ -283,7 +283,7 @@ func (rf *Fetcher) Init(
table.indexColIdx = make([]int, nIndexCols)
}
for i := 0; i < nIndexCols; i++ {
id := spec.KeyAndSuffixColumns[i].ColumnID
id := args.Spec.KeyAndSuffixColumns[i].ColumnID
colIdx, ok := table.colIdxMap.Get(id)
if ok {
table.indexColIdx[i] = colIdx
Expand All @@ -292,7 +292,7 @@ func (rf *Fetcher) Init(
} else {
table.indexColIdx[i] = -1
}
if spec.KeyAndSuffixColumns[i].IsComposite {
if args.Spec.KeyAndSuffixColumns[i].IsComposite {
compositeIndexCols++
}
}
Expand All @@ -304,7 +304,7 @@ func (rf *Fetcher) Init(
// The number of columns we need to read from the value part of the key.
// It's the total number of needed columns minus the ones we read from the
// index key, except for composite columns.
table.neededValueCols = len(spec.FetchedColumns) - neededIndexCols + compositeIndexCols
table.neededValueCols = len(args.Spec.FetchedColumns) - neededIndexCols + compositeIndexCols

if cap(table.keyVals) >= nIndexCols {
table.keyVals = table.keyVals[:nIndexCols]
Expand Down Expand Up @@ -360,18 +360,20 @@ func (rf *Fetcher) StartScan(

f, err := makeKVBatchFetcher(
ctx,
makeKVBatchFetcherDefaultSendFunc(txn),
spans,
rf.reverse,
batchBytesLimit,
rf.rowLimitToKeyLimit(rowLimitHint),
rf.lockStrength,
rf.lockWaitPolicy,
rf.lockTimeout,
rf.kvFetcherMemAcc,
forceProductionKVBatchSize,
txn.AdmissionHeader(),
txn.DB().SQLKVResponseAdmissionQ,
kvBatchFetcherArgs{
sendFn: makeKVBatchFetcherDefaultSendFunc(txn),
spans: spans,
reverse: rf.reverse,
batchBytesLimit: batchBytesLimit,
firstBatchKeyLimit: rf.rowLimitToKeyLimit(rowLimitHint),
lockStrength: rf.lockStrength,
lockWaitPolicy: rf.lockWaitPolicy,
lockTimeout: rf.lockTimeout,
acc: rf.kvFetcherMemAcc,
forceProductionKVBatchSize: forceProductionKVBatchSize,
requestAdmissionHeader: txn.AdmissionHeader(),
responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ,
},
)
if err != nil {
return err
Expand Down Expand Up @@ -461,18 +463,20 @@ func (rf *Fetcher) StartInconsistentScan(

f, err := makeKVBatchFetcher(
ctx,
sendFunc(sendFn),
spans,
rf.reverse,
batchBytesLimit,
rf.rowLimitToKeyLimit(rowLimitHint),
rf.lockStrength,
rf.lockWaitPolicy,
rf.lockTimeout,
rf.kvFetcherMemAcc,
forceProductionKVBatchSize,
txn.AdmissionHeader(),
txn.DB().SQLKVResponseAdmissionQ,
kvBatchFetcherArgs{
sendFn: sendFn,
spans: spans,
reverse: rf.reverse,
batchBytesLimit: batchBytesLimit,
firstBatchKeyLimit: rf.rowLimitToKeyLimit(rowLimitHint),
lockStrength: rf.lockStrength,
lockWaitPolicy: rf.lockWaitPolicy,
lockTimeout: rf.lockTimeout,
acc: rf.kvFetcherMemAcc,
forceProductionKVBatchSize: forceProductionKVBatchSize,
requestAdmissionHeader: txn.AdmissionHeader(),
responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ,
},
)
if err != nil {
return err
Expand Down
11 changes: 4 additions & 7 deletions pkg/sql/row/fetcher_mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,10 @@ func TestRowFetcherMVCCMetadata(t *testing.T) {
var rf row.Fetcher
if err := rf.Init(
ctx,
false, /* reverse */
descpb.ScanLockingStrength_FOR_NONE,
descpb.ScanLockingWaitPolicy_BLOCK,
0, /* lockTimeout */
&tree.DatumAlloc{},
nil, /* memMonitor */
&spec,
row.FetcherInitArgs{
Alloc: &tree.DatumAlloc{},
Spec: &spec,
},
); err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 9246ff5

Please sign in to comment.