Skip to content

Commit

Permalink
sql: clean up Fetcher interfaces a bit
Browse files Browse the repository at this point in the history
This commit removes a couple of arguments (`traceKV`,
`forceProductionBatchSize`) from `StartScan*` fetcher methods in
favor of passing them on `Init`. Additionally, several fields are
removed from `row.Fetcher` in favor of accessing the args struct
directly.

The only meaningful change is that now we correctly propagate `traceKV`
flag in the column backfiller code path when it is set up in
a distributed case.

Release note: None
  • Loading branch information
yuzefovich committed Jun 22, 2022
1 parent 8601bed commit fecfaed
Show file tree
Hide file tree
Showing 20 changed files with 131 additions and 160 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (d *eventDecoder) DecodeKV(

d.kvFetcher.KVs = d.kvFetcher.KVs[:0]
d.kvFetcher.KVs = append(d.kvFetcher.KVs, kv)
if err := d.fetcher.StartScanFrom(ctx, &d.kvFetcher, false /* traceKV */); err != nil {
if err := d.fetcher.StartScanFrom(ctx, &d.kvFetcher); err != nil {
return Row{}, err
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,13 @@ func makeRowFetcher(
}

var rf row.Fetcher
if err := rf.Init(ctx,
if err := rf.Init(
ctx,
row.FetcherInitArgs{
Alloc: &tree.DatumAlloc{},
Spec: &spec,
}); err != nil {
},
); err != nil {
return rf, err
}
return rf, nil
Expand Down Expand Up @@ -667,7 +669,7 @@ func processEntryFiles(
}
kvFetcher := row.MakeBackupSSTKVFetcher(startKeyMVCC, endKeyMVCC, iter, startTime, endTime, debugBackupArgs.withRevisions)

if err := rf.StartScanFrom(ctx, &kvFetcher, false /* traceKV */); err != nil {
if err := rf.StartScanFrom(ctx, &kvFetcher); err != nil {
return errors.Wrapf(err, "row fetcher starts scan")
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2586,17 +2586,18 @@ func columnBackfillInTxn(
rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal)
var backfiller backfill.ColumnBackfiller
if err := backfiller.InitForLocalUse(
ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics,
ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, traceKV,
); err != nil {
return err
}
defer backfiller.Close(ctx)
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
for sp.Key != nil {
var err error
sp.Key, err = backfiller.RunColumnBackfillChunk(ctx,
txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV)
sp.Key, err = backfiller.RunColumnBackfillChunk(
ctx, txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
false /*alsoCommit*/, traceKV,
)
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (cb *ColumnBackfiller) init(
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
rowMetrics *rowinfra.Metrics,
traceKV bool,
) error {
cb.evalCtx = evalCtx
cb.updateCols = append(cb.added, cb.dropped...)
Expand Down Expand Up @@ -156,6 +157,7 @@ func (cb *ColumnBackfiller) init(
Alloc: &cb.alloc,
MemMonitor: cb.mon,
Spec: &spec,
TraceKV: traceKV,
},
)
}
Expand All @@ -170,6 +172,7 @@ func (cb *ColumnBackfiller) InitForLocalUse(
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
rowMetrics *rowinfra.Metrics,
traceKV bool,
) error {
cb.initCols(desc)
defaultExprs, err := schemaexpr.MakeDefaultExprs(
Expand All @@ -190,7 +193,7 @@ func (cb *ColumnBackfiller) InitForLocalUse(
if err != nil {
return err
}
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics)
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, traceKV)
}

// InitForDistributedUse initializes a ColumnBackfiller for use as part of a
Expand Down Expand Up @@ -247,7 +250,7 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
flowCtx.Descriptors.ReleaseAll(ctx)

rowMetrics := flowCtx.GetRowMetrics()
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics)
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, flowCtx.TraceKV)
}

// Close frees the resources used by the ColumnBackfiller.
Expand Down Expand Up @@ -310,7 +313,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
if err := cb.fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */
rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */),
chunkSize, traceKV, false, /* forceProductionKVBatchSize */
chunkSize,
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
return roachpb.Key{}, err
Expand Down Expand Up @@ -804,6 +807,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
Alloc: &ib.alloc,
MemMonitor: ib.mon,
Spec: &spec,
TraceKV: traceKV,
},
); err != nil {
return nil, nil, 0, err
Expand All @@ -812,7 +816,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
if err := fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */
rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */),
initBufferSize, traceKV, false, /* forceProductionKVBatchSize */
initBufferSize,
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
return nil, nil, 0, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ type cFetcherArgs struct {
reverse bool
// traceKV indicates whether or not session tracing is enabled. It is set
// when initializing the fetcher.
traceKV bool
traceKV bool
forceProductionKVBatchSize bool
}

// noOutputColumn is a sentinel value to denote that a system column is not
Expand Down Expand Up @@ -511,7 +512,6 @@ func (cf *cFetcher) StartScan(
limitBatches bool,
batchBytesLimit rowinfra.BytesLimit,
limitHint rowinfra.RowLimit,
forceProductionKVBatchSize bool,
) error {
if len(spans) == 0 {
return errors.AssertionFailedf("no spans")
Expand Down Expand Up @@ -558,7 +558,7 @@ func (cf *cFetcher) StartScan(
cf.lockWaitPolicy,
cf.lockTimeout,
cf.kvFetcherMemAcc,
forceProductionKVBatchSize,
cf.forceProductionKVBatchSize,
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (s *ColBatchScan) Init(ctx context.Context) {
limitBatches,
s.batchBytesLimit,
s.limitHint,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
); err != nil {
colexecerror.InternalError(err)
}
Expand Down Expand Up @@ -204,6 +203,7 @@ func NewColBatchScan(
estimatedRowCount,
spec.Reverse,
flowCtx.TraceKV,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
}

if err = fetcher.Init(allocator, kvFetcherMemAcc, tableArgs); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ func (s *ColIndexJoin) Next() coldata.Batch {
false, /* limitBatches */
rowinfra.NoBytesLimit,
rowinfra.NoRowLimit,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
)
}
if err != nil {
Expand Down Expand Up @@ -552,6 +551,7 @@ func NewColIndexJoin(
0, /* estimatedRowCount */
false, /* reverse */
flowCtx.TraceKV,
flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
}
if err = fetcher.Init(
fetcherAllocator, kvFetcherMemAcc, tableArgs,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/delete_preserving_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,11 +782,12 @@ func fetchIndex(
Alloc: &alloc,
MemMonitor: mm.Monitor(),
Spec: &spec,
TraceKV: true,
},
))

require.NoError(t, fetcher.StartScan(
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0,
))
var rows []tree.Datums
for {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/indexbackfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,12 @@ INSERT INTO foo VALUES (1), (10), (100);
Alloc: &alloc,
MemMonitor: mm.Monitor(),
Spec: &spec,
TraceKV: true,
},
))

require.NoError(t, fetcher.StartScan(
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0,
))
var rows []tree.Datums
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/row/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func DecodeRowInfo(
}
// Use the Fetcher to decode the single kv pair above by passing in
// this singleKVFetcher implementation, which doesn't actually hit KV.
if err := rf.StartScanFrom(ctx, &f, false /* traceKV */); err != nil {
if err := rf.StartScanFrom(ctx, &f); err != nil {
return nil, nil, nil, err
}
datums, err := rf.NextRowDecoded(ctx)
Expand Down
Loading

0 comments on commit fecfaed

Please sign in to comment.