diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index a54d968c95f3..8a5424ea3193 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -387,7 +387,7 @@ func (d *eventDecoder) DecodeKV( d.kvFetcher.KVs = d.kvFetcher.KVs[:0] d.kvFetcher.KVs = append(d.kvFetcher.KVs, kv) - if err := d.fetcher.StartScanFrom(ctx, &d.kvFetcher, false /* traceKV */); err != nil { + if err := d.fetcher.StartScanFrom(ctx, &d.kvFetcher); err != nil { return Row{}, err } diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index 64f68a8ace98..c8c4d4300485 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -624,11 +624,13 @@ func makeRowFetcher( } var rf row.Fetcher - if err := rf.Init(ctx, + if err := rf.Init( + ctx, row.FetcherInitArgs{ Alloc: &tree.DatumAlloc{}, Spec: &spec, - }); err != nil { + }, + ); err != nil { return rf, err } return rf, nil @@ -667,7 +669,7 @@ func processEntryFiles( } kvFetcher := row.MakeBackupSSTKVFetcher(startKeyMVCC, endKeyMVCC, iter, startTime, endTime, debugBackupArgs.withRevisions) - if err := rf.StartScanFrom(ctx, &kvFetcher, false /* traceKV */); err != nil { + if err := rf.StartScanFrom(ctx, &kvFetcher); err != nil { return errors.Wrapf(err, "row fetcher starts scan") } diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index f5af90ae8223..fdeab85213d9 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -2586,7 +2586,7 @@ func columnBackfillInTxn( rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal) var backfiller backfill.ColumnBackfiller if err := backfiller.InitForLocalUse( - ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, + ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, traceKV, ); err != nil { return err } @@ -2594,9 +2594,10 @@ func columnBackfillInTxn( sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec) for sp.Key != nil { var err error - sp.Key, err = backfiller.RunColumnBackfillChunk(ctx, - txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)), - false /*alsoCommit*/, traceKV) + sp.Key, err = backfiller.RunColumnBackfillChunk( + ctx, txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)), + false /*alsoCommit*/, traceKV, + ) if err != nil { return err } diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index cae733632ac0..57febb191ed5 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -112,6 +112,7 @@ func (cb *ColumnBackfiller) init( desc catalog.TableDescriptor, mon *mon.BytesMonitor, rowMetrics *rowinfra.Metrics, + traceKV bool, ) error { cb.evalCtx = evalCtx cb.updateCols = append(cb.added, cb.dropped...) @@ -156,6 +157,7 @@ func (cb *ColumnBackfiller) init( Alloc: &cb.alloc, MemMonitor: cb.mon, Spec: &spec, + TraceKV: traceKV, }, ) } @@ -170,6 +172,7 @@ func (cb *ColumnBackfiller) InitForLocalUse( desc catalog.TableDescriptor, mon *mon.BytesMonitor, rowMetrics *rowinfra.Metrics, + traceKV bool, ) error { cb.initCols(desc) defaultExprs, err := schemaexpr.MakeDefaultExprs( @@ -190,7 +193,7 @@ func (cb *ColumnBackfiller) InitForLocalUse( if err != nil { return err } - return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics) + return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, traceKV) } // InitForDistributedUse initializes a ColumnBackfiller for use as part of a @@ -247,7 +250,7 @@ func (cb *ColumnBackfiller) InitForDistributedUse( flowCtx.Descriptors.ReleaseAll(ctx) rowMetrics := flowCtx.GetRowMetrics() - return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics) + return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, flowCtx.TraceKV) } // Close frees the resources used by the ColumnBackfiller. @@ -310,7 +313,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( if err := cb.fetcher.StartScan( ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), - chunkSize, traceKV, false, /* forceProductionKVBatchSize */ + chunkSize, ); err != nil { log.Errorf(ctx, "scan error: %s", err) return roachpb.Key{}, err @@ -804,6 +807,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( Alloc: &ib.alloc, MemMonitor: ib.mon, Spec: &spec, + TraceKV: traceKV, }, ); err != nil { return nil, nil, 0, err @@ -812,7 +816,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( if err := fetcher.StartScan( ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), - initBufferSize, traceKV, false, /* forceProductionKVBatchSize */ + initBufferSize, ); err != nil { log.Errorf(ctx, "scan error: %s", err) return nil, nil, 0, err diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index fefbd7370509..b0df0033999f 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -185,7 +185,8 @@ type cFetcherArgs struct { reverse bool // traceKV indicates whether or not session tracing is enabled. It is set // when initializing the fetcher. - traceKV bool + traceKV bool + forceProductionKVBatchSize bool } // noOutputColumn is a sentinel value to denote that a system column is not @@ -511,7 +512,6 @@ func (cf *cFetcher) StartScan( limitBatches bool, batchBytesLimit rowinfra.BytesLimit, limitHint rowinfra.RowLimit, - forceProductionKVBatchSize bool, ) error { if len(spans) == 0 { return errors.AssertionFailedf("no spans") @@ -558,7 +558,7 @@ func (cf *cFetcher) StartScan( cf.lockWaitPolicy, cf.lockTimeout, cf.kvFetcherMemAcc, - forceProductionKVBatchSize, + cf.forceProductionKVBatchSize, ) if err != nil { return err diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 8a081d8b411c..a9829c1987fe 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -98,7 +98,6 @@ func (s *ColBatchScan) Init(ctx context.Context) { limitBatches, s.batchBytesLimit, s.limitHint, - s.flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, ); err != nil { colexecerror.InternalError(err) } @@ -204,6 +203,7 @@ func NewColBatchScan( estimatedRowCount, spec.Reverse, flowCtx.TraceKV, + flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, } if err = fetcher.Init(allocator, kvFetcherMemAcc, tableArgs); err != nil { diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 4e26e655ab4b..bc79ba040001 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -260,7 +260,6 @@ func (s *ColIndexJoin) Next() coldata.Batch { false, /* limitBatches */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, - s.flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, ) } if err != nil { @@ -552,6 +551,7 @@ func NewColIndexJoin( 0, /* estimatedRowCount */ false, /* reverse */ flowCtx.TraceKV, + flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, } if err = fetcher.Init( fetcherAllocator, kvFetcherMemAcc, tableArgs, diff --git a/pkg/sql/delete_preserving_index_test.go b/pkg/sql/delete_preserving_index_test.go index 4cb1bfe4771e..73ba6b4ef386 100644 --- a/pkg/sql/delete_preserving_index_test.go +++ b/pkg/sql/delete_preserving_index_test.go @@ -782,11 +782,12 @@ func fetchIndex( Alloc: &alloc, MemMonitor: mm.Monitor(), Spec: &spec, + TraceKV: true, }, )) require.NoError(t, fetcher.StartScan( - ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */ + ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, )) var rows []tree.Datums for { diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index 84b1b3f9e5b5..447506e4d618 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -412,11 +412,12 @@ INSERT INTO foo VALUES (1), (10), (100); Alloc: &alloc, MemMonitor: mm.Monitor(), Spec: &spec, + TraceKV: true, }, )) require.NoError(t, fetcher.StartScan( - ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */ + ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, )) var rows []tree.Datums for { diff --git a/pkg/sql/row/errors.go b/pkg/sql/row/errors.go index 49a0ca1bf0a0..c798e325b145 100644 --- a/pkg/sql/row/errors.go +++ b/pkg/sql/row/errors.go @@ -292,7 +292,7 @@ func DecodeRowInfo( } // Use the Fetcher to decode the single kv pair above by passing in // this singleKVFetcher implementation, which doesn't actually hit KV. - if err := rf.StartScanFrom(ctx, &f, false /* traceKV */); err != nil { + if err := rf.StartScanFrom(ctx, &f); err != nil { return nil, nil, nil, err } datums, err := rf.NextRowDecoded(ctx) diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 8f8bb464494f..816bc67a0a30 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -149,33 +149,13 @@ type tableInfo struct { // // Process res.row // } type Fetcher struct { + args FetcherInitArgs table tableInfo - // reverse denotes whether or not the spans should be read in reverse - // or not when StartScan is invoked. - reverse bool - // True if the index key must be decoded. This is only false if there are no // needed columns. mustDecodeIndexKey bool - // lockStrength represents the row-level locking mode to use when fetching - // rows. - lockStrength descpb.ScanLockingStrength - - // lockWaitPolicy represents the policy to be used for handling conflicting - // locks held by other active transactions. - lockWaitPolicy descpb.ScanLockingWaitPolicy - - // lockTimeout specifies the maximum amount of time that the fetcher will - // wait while attempting to acquire a lock on a key or while blocking on an - // existing lock in order to perform a non-locking read on a key. - lockTimeout time.Duration - - // traceKV indicates whether or not session tracing is enabled. It is set - // when beginning a new scan. - traceKV bool - // mvccDecodeStrategy controls whether or not MVCC timestamps should // be decoded from KV's fetched. mvccDecodeStrategy MVCCDecodingStrategy @@ -202,9 +182,6 @@ type Fetcher struct { // columns and is only used for decoding for error messages or debugging. IgnoreUnexpectedNulls bool - // Buffered allocation of decoded datums. - alloc *tree.DatumAlloc - // Memory monitor and memory account for the bytes fetched by this fetcher. mon *mon.BytesMonitor kvFetcherMemAcc *mon.BoundAccount @@ -233,13 +210,26 @@ func (rf *Fetcher) Close(ctx context.Context) { // FetcherInitArgs contains arguments for Fetcher.Init. type FetcherInitArgs struct { - Reverse bool - LockStrength descpb.ScanLockingStrength + // Reverse denotes whether or not the spans should be read in reverse or not + // when StartScan* methods are invoked. + Reverse bool + // LockStrength represents the row-level locking mode to use when fetching + // rows. + LockStrength descpb.ScanLockingStrength + // LockWaitPolicy represents the policy to be used for handling conflicting + // locks held by other active transactions. LockWaitPolicy descpb.ScanLockingWaitPolicy - LockTimeout time.Duration - Alloc *tree.DatumAlloc - MemMonitor *mon.BytesMonitor - Spec *descpb.IndexFetchSpec + // LockTimeout specifies the maximum amount of time that the fetcher will + // wait while attempting to acquire a lock on a key or while blocking on an + // existing lock in order to perform a non-locking read on a key. + LockTimeout time.Duration + // Alloc is used for buffered allocation of decoded datums. + Alloc *tree.DatumAlloc + MemMonitor *mon.BytesMonitor + Spec *descpb.IndexFetchSpec + // TraceKV indicates whether or not session tracing is enabled. + TraceKV bool + ForceProductionKVBatchSize bool } // Init sets up a Fetcher for a given table and index. @@ -247,11 +237,8 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { if args.Spec.Version != descpb.IndexFetchSpecVersionInitial { return errors.Newf("unsupported IndexFetchSpec version %d", args.Spec.Version) } - rf.reverse = args.Reverse - rf.lockStrength = args.LockStrength - rf.lockWaitPolicy = args.LockWaitPolicy - rf.lockTimeout = args.LockTimeout - rf.alloc = args.Alloc + + rf.args = args if args.MemMonitor != nil { rf.mon = mon.NewMonitorInheritWithLimit("fetcher-mem", 0 /* limit */, args.MemMonitor) @@ -398,8 +385,6 @@ func (rf *Fetcher) StartScan( spanIDs []int, batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, - traceKV bool, - forceProductionKVBatchSize bool, ) error { if len(spans) == 0 { return errors.AssertionFailedf("no spans") @@ -411,14 +396,14 @@ func (rf *Fetcher) StartScan( sendFn: makeKVBatchFetcherDefaultSendFunc(txn), spans: spans, spanIDs: spanIDs, - reverse: rf.reverse, + reverse: rf.args.Reverse, batchBytesLimit: batchBytesLimit, firstBatchKeyLimit: rf.rowLimitToKeyLimit(rowLimitHint), - lockStrength: rf.lockStrength, - lockWaitPolicy: rf.lockWaitPolicy, - lockTimeout: rf.lockTimeout, + lockStrength: rf.args.LockStrength, + lockWaitPolicy: rf.args.LockWaitPolicy, + lockTimeout: rf.args.LockTimeout, acc: rf.kvFetcherMemAcc, - forceProductionKVBatchSize: forceProductionKVBatchSize, + forceProductionKVBatchSize: rf.args.ForceProductionKVBatchSize, requestAdmissionHeader: txn.AdmissionHeader(), responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, }, @@ -426,7 +411,7 @@ func (rf *Fetcher) StartScan( if err != nil { return err } - return rf.StartScanFrom(ctx, &f, traceKV) + return rf.StartScanFrom(ctx, &f) } // TestingInconsistentScanSleep introduces a sleep inside the fetcher after @@ -453,8 +438,6 @@ func (rf *Fetcher) StartInconsistentScan( spans roachpb.Spans, batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, - traceKV bool, - forceProductionKVBatchSize bool, qualityOfService sessiondatapb.QoSLevel, ) error { if len(spans) == 0 { @@ -515,14 +498,14 @@ func (rf *Fetcher) StartInconsistentScan( sendFn: sendFn, spans: spans, spanIDs: nil, - reverse: rf.reverse, + reverse: rf.args.Reverse, batchBytesLimit: batchBytesLimit, firstBatchKeyLimit: rf.rowLimitToKeyLimit(rowLimitHint), - lockStrength: rf.lockStrength, - lockWaitPolicy: rf.lockWaitPolicy, - lockTimeout: rf.lockTimeout, + lockStrength: rf.args.LockStrength, + lockWaitPolicy: rf.args.LockWaitPolicy, + lockTimeout: rf.args.LockTimeout, acc: rf.kvFetcherMemAcc, - forceProductionKVBatchSize: forceProductionKVBatchSize, + forceProductionKVBatchSize: rf.args.ForceProductionKVBatchSize, requestAdmissionHeader: txn.AdmissionHeader(), responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, }, @@ -530,7 +513,7 @@ func (rf *Fetcher) StartInconsistentScan( if err != nil { return err } - return rf.StartScanFrom(ctx, &f, traceKV) + return rf.StartScanFrom(ctx, &f) } func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.KeyLimit { @@ -550,8 +533,7 @@ func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.K // StartScanFrom initializes and starts a scan from the given KVBatchFetcher. Can be // used multiple times. -func (rf *Fetcher) StartScanFrom(ctx context.Context, f KVBatchFetcher, traceKV bool) error { - rf.traceKV = traceKV +func (rf *Fetcher) StartScanFrom(ctx context.Context, f KVBatchFetcher) error { rf.indexKey = nil if rf.kvFetcher != nil { rf.kvFetcher.Close(ctx) @@ -668,7 +650,7 @@ func (rf *Fetcher) prettyKeyDatums( var buf strings.Builder for i, v := range vals { buf.WriteByte('/') - if err := v.EnsureDecoded(cols[i].Type, rf.alloc); err != nil { + if err := v.EnsureDecoded(cols[i].Type, rf.args.Alloc); err != nil { buf.WriteByte('?') } else { buf.WriteString(v.Datum.String()) @@ -692,7 +674,7 @@ func (rf *Fetcher) processKV( ) (prettyKey string, prettyValue string, err error) { table := &rf.table - if rf.traceKV { + if rf.args.TraceKV { prettyKey = fmt.Sprintf( "/%s/%s%s", table.spec.TableName, @@ -744,7 +726,7 @@ func (rf *Fetcher) processKV( if len(table.spec.FetchedColumns) == 0 { // We don't need to decode any values. - if rf.traceKV { + if rf.args.TraceKV { prettyValue = "" } return prettyKey, prettyValue, nil @@ -832,7 +814,7 @@ func (rf *Fetcher) processKV( table.row[idx] = table.extraVals[i] } } - if rf.traceKV { + if rf.args.TraceKV { prettyValue = rf.prettyKeyDatums(extraCols, table.extraVals) } } @@ -853,7 +835,7 @@ func (rf *Fetcher) processKV( } } - if rf.traceKV && prettyValue == "" { + if rf.args.TraceKV && prettyValue == "" { prettyValue = "" } @@ -880,7 +862,7 @@ func (rf *Fetcher) processValueSingle( return prettyKey, "", nil } - if rf.traceKV { + if rf.args.TraceKV { prettyKey = fmt.Sprintf("%s/%s", prettyKey, table.spec.FetchedColumns[idx].Name) } if len(kv.Value.RawBytes) == 0 { @@ -892,11 +874,11 @@ func (rf *Fetcher) processValueSingle( // although that would require changing UnmarshalColumnValue to operate // on bytes, and for Encode/DecodeTableValue to operate on marshaled // single values. - value, err := valueside.UnmarshalLegacy(rf.alloc, typ, kv.Value) + value, err := valueside.UnmarshalLegacy(rf.args.Alloc, typ, kv.Value) if err != nil { return "", "", err } - if rf.traceKV { + if rf.args.TraceKV { prettyValue = value.String() } table.row[idx] = rowenc.DatumToEncDatum(typ, value) @@ -914,7 +896,7 @@ func (rf *Fetcher) processValueBytes( prettyKeyPrefix string, ) (prettyKey string, prettyValue string, err error) { prettyKey = prettyKeyPrefix - if rf.traceKV { + if rf.args.TraceKV { if rf.prettyValueBuf == nil { rf.prettyValueBuf = &bytes.Buffer{} } @@ -946,7 +928,7 @@ func (rf *Fetcher) processValueBytes( continue } - if rf.traceKV { + if rf.args.TraceKV { prettyKey = fmt.Sprintf("%s/%s", prettyKey, table.spec.FetchedColumns[idx].Name) } @@ -956,8 +938,8 @@ func (rf *Fetcher) processValueBytes( if err != nil { return "", "", err } - if rf.traceKV { - err := encValue.EnsureDecoded(table.spec.FetchedColumns[idx].Type, rf.alloc) + if rf.args.TraceKV { + err := encValue.EnsureDecoded(table.spec.FetchedColumns[idx].Type, rf.args.Alloc) if err != nil { return "", "", err } @@ -969,7 +951,7 @@ func (rf *Fetcher) processValueBytes( log.Infof(ctx, "Scan %d -> %v", idx, encValue) } } - if rf.traceKV { + if rf.args.TraceKV { prettyValue = rf.prettyValueBuf.String() } return prettyKey, prettyValue, nil @@ -999,7 +981,7 @@ func (rf *Fetcher) NextRow(ctx context.Context) (row rowenc.EncDatumRow, spanID if err != nil { return nil, 0, err } - if rf.traceKV { + if rf.args.TraceKV { log.VEventf(ctx, 2, "fetched: %s -> %s", prettyKey, prettyVal) } @@ -1059,7 +1041,7 @@ func (rf *Fetcher) NextRowDecoded(ctx context.Context) (datums tree.Datums, err rf.table.decodedRow[i] = tree.DNull continue } - if err := encDatum.EnsureDecoded(rf.table.spec.FetchedColumns[i].Type, rf.alloc); err != nil { + if err := encDatum.EnsureDecoded(rf.table.spec.FetchedColumns[i].Type, rf.args.Alloc); err != nil { return nil, err } rf.table.decodedRow[i] = encDatum.Datum @@ -1099,7 +1081,7 @@ func (rf *Fetcher) NextRowDecodedInto( destination[ord] = tree.DNull continue } - if err := encDatum.EnsureDecoded(col.Type, rf.alloc); err != nil { + if err := encDatum.EnsureDecoded(col.Type, rf.args.Alloc); err != nil { return false, err } destination[ord] = encDatum.Datum @@ -1130,7 +1112,7 @@ func (rf *Fetcher) finalizeRow() error { // TODO (rohany): Datums are immutable, so we can't store a DDecimal on the // fetcher and change its contents with each row. If that assumption gets // lifted, then we can avoid an allocation of a new decimal datum here. - dec := rf.alloc.NewDDecimal(tree.DDecimal{Decimal: eval.TimestampToDecimal(rf.RowLastModified())}) + dec := rf.args.Alloc.NewDDecimal(tree.DDecimal{Decimal: eval.TimestampToDecimal(rf.RowLastModified())}) table.row[table.timestampOutputIdx] = rowenc.EncDatum{Datum: dec} } if table.oidOutputIdx != noOutputColumn { diff --git a/pkg/sql/row/fetcher_mvcc_test.go b/pkg/sql/row/fetcher_mvcc_test.go index 5d108dbd2771..e1dcb6826525 100644 --- a/pkg/sql/row/fetcher_mvcc_test.go +++ b/pkg/sql/row/fetcher_mvcc_test.go @@ -114,7 +114,7 @@ func TestRowFetcherMVCCMetadata(t *testing.T) { log.Infof(ctx, "%v %v %v", kv.Key, kv.Value.Timestamp, kv.Value.PrettyPrint()) } - if err := rf.StartScanFrom(ctx, &row.SpanKVFetcher{KVs: kvs}, false /* traceKV */); err != nil { + if err := rf.StartScanFrom(ctx, &row.SpanKVFetcher{KVs: kvs}); err != nil { t.Fatal(err) } var rows []rowWithMVCCMetadata diff --git a/pkg/sql/row/fetcher_test.go b/pkg/sql/row/fetcher_test.go index 380a9395310b..4e6de73c1405 100644 --- a/pkg/sql/row/fetcher_test.go +++ b/pkg/sql/row/fetcher_test.go @@ -156,8 +156,6 @@ func TestNextRowSingle(t *testing.T) { nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, - false, /*traceKV*/ - false, /*forceProductionKVBatchSize*/ ); err != nil { t.Fatal(err) } @@ -260,9 +258,7 @@ func TestNextRowBatchLimiting(t *testing.T) { roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), - 10, /*limitHint*/ - false, /*traceKV*/ - false, /*forceProductionKVBatchSize*/ + 10, /*limitHint*/ ); err != nil { t.Fatal(err) } @@ -356,8 +352,6 @@ func TestRowFetcherMemoryLimits(t *testing.T) { nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, - false, /*traceKV*/ - false, /*forceProductionKVBatchSize*/ ) assert.Error(t, err) assert.Equal(t, pgerror.GetPGCode(err), pgcode.OutOfMemory) @@ -435,9 +429,7 @@ INDEX(c) rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), // Set a limitHint of 1 to more quickly end the first batch, causing a // batch that ends between rows. - 1, /*limitHint*/ - false, /*traceKV*/ - false, /*forceProductionKVBatchSize*/ + 1, /*limitHint*/ ); err != nil { t.Fatal(err) } @@ -593,8 +585,6 @@ func TestNextRowSecondaryIndex(t *testing.T) { nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, - false, /*traceKV*/ - false, /*forceProductionKVBatchSize*/ ); err != nil { t.Fatal(err) } diff --git a/pkg/sql/rowexec/columnbackfiller.go b/pkg/sql/rowexec/columnbackfiller.go index 21cd737b78ac..af0e05c98f1f 100644 --- a/pkg/sql/rowexec/columnbackfiller.go +++ b/pkg/sql/rowexec/columnbackfiller.go @@ -78,8 +78,9 @@ func newColumnBackfiller( } cb.backfiller.chunks = cb - if err := cb.ColumnBackfiller.InitForDistributedUse(ctx, flowCtx, cb.desc, - columnBackfillerMon); err != nil { + if err := cb.ColumnBackfiller.InitForDistributedUse( + ctx, flowCtx, cb.desc, columnBackfillerMon, + ); err != nil { return nil, err } diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 4a4379c0dd0e..bb170124c5f4 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -291,12 +291,14 @@ func newInvertedJoiner( if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ - LockStrength: spec.LockingStrength, - LockWaitPolicy: spec.LockingWaitPolicy, - LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, - Alloc: &ij.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, - Spec: &spec.FetchSpec, + LockStrength: spec.LockingStrength, + LockWaitPolicy: spec.LockingWaitPolicy, + LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, + Alloc: &ij.alloc, + MemMonitor: flowCtx.EvalCtx.Mon, + Spec: &spec.FetchSpec, + TraceKV: flowCtx.TraceKV, + ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, }, ); err != nil { return nil, err @@ -496,7 +498,6 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce if err = ij.fetcher.StartScan( ij.Ctx, ij.FlowCtx.Txn, ij.indexSpans, nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, - ij.FlowCtx.TraceKV, ij.EvalCtx.TestingKnobs.ForceProductionValues, ); err != nil { ij.MoveToDraining(err) return ijStateUnknown, ij.DrainHelper() diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 21f0ba42ea51..9a56e8680c78 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -400,12 +400,14 @@ func newJoinReader( if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ - LockStrength: spec.LockingStrength, - LockWaitPolicy: spec.LockingWaitPolicy, - LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, - Alloc: &jr.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, - Spec: &spec.FetchSpec, + LockStrength: spec.LockingStrength, + LockWaitPolicy: spec.LockingWaitPolicy, + LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, + Alloc: &jr.alloc, + MemMonitor: flowCtx.EvalCtx.Mon, + Spec: &spec.FetchSpec, + TraceKV: flowCtx.TraceKV, + ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, }, ); err != nil { return nil, err @@ -948,7 +950,7 @@ func (jr *joinReader) readInput() ( jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() } - err = jr.fetcher.StartScanFrom(jr.Ctx, kvBatchFetcher, jr.FlowCtx.TraceKV) + err = jr.fetcher.StartScanFrom(jr.Ctx, kvBatchFetcher) } else { var bytesLimit rowinfra.BytesLimit if !jr.shouldLimitBatches { @@ -961,7 +963,6 @@ func (jr *joinReader) readInput() ( } err = jr.fetcher.StartScan( jr.Ctx, jr.txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, - jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionValues, ) } if err != nil { @@ -1028,7 +1029,6 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet } if err := jr.fetcher.StartScan( jr.Ctx, jr.txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, - jr.FlowCtx.TraceKV, jr.EvalCtx.TestingKnobs.ForceProductionValues, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() diff --git a/pkg/sql/rowexec/rowfetcher.go b/pkg/sql/rowexec/rowfetcher.go index 5fc311863888..16db8669c924 100644 --- a/pkg/sql/rowexec/rowfetcher.go +++ b/pkg/sql/rowexec/rowfetcher.go @@ -28,10 +28,10 @@ import ( // collector wrapper can be plugged in. type rowFetcher interface { StartScan( - _ context.Context, _ *kv.Txn, _ roachpb.Spans, spanIDs []int, batchBytesLimit rowinfra.BytesLimit, - rowLimitHint rowinfra.RowLimit, traceKV bool, forceProductionKVBatchSize bool, + _ context.Context, _ *kv.Txn, _ roachpb.Spans, spanIDs []int, + batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, ) error - StartScanFrom(_ context.Context, _ row.KVBatchFetcher, traceKV bool) error + StartScanFrom(_ context.Context, _ row.KVBatchFetcher) error StartInconsistentScan( _ context.Context, _ *kv.DB, @@ -40,8 +40,6 @@ type rowFetcher interface { spans roachpb.Spans, batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, - traceKV bool, - forceProductionKVBatchSize bool, qualityOfService sessiondatapb.QoSLevel, ) error diff --git a/pkg/sql/rowexec/stats.go b/pkg/sql/rowexec/stats.go index c92bc22f2185..48ee23a378dc 100644 --- a/pkg/sql/rowexec/stats.go +++ b/pkg/sql/rowexec/stats.go @@ -99,21 +99,17 @@ func (c *rowFetcherStatCollector) StartScan( spanIDs []int, batchBytesLimit rowinfra.BytesLimit, limitHint rowinfra.RowLimit, - traceKV bool, - forceProductionKVBatchSize bool, ) error { start := timeutil.Now() - err := c.fetcher.StartScan(ctx, txn, spans, spanIDs, batchBytesLimit, limitHint, traceKV, forceProductionKVBatchSize) + err := c.fetcher.StartScan(ctx, txn, spans, spanIDs, batchBytesLimit, limitHint) c.startScanStallTime += timeutil.Since(start) return err } // StartScanFrom is part of the rowFetcher interface. -func (c *rowFetcherStatCollector) StartScanFrom( - ctx context.Context, f row.KVBatchFetcher, traceKV bool, -) error { +func (c *rowFetcherStatCollector) StartScanFrom(ctx context.Context, f row.KVBatchFetcher) error { start := timeutil.Now() - err := c.fetcher.StartScanFrom(ctx, f, traceKV) + err := c.fetcher.StartScanFrom(ctx, f) c.startScanStallTime += timeutil.Since(start) return err } @@ -127,14 +123,12 @@ func (c *rowFetcherStatCollector) StartInconsistentScan( spans roachpb.Spans, batchBytesLimit rowinfra.BytesLimit, limitHint rowinfra.RowLimit, - traceKV bool, - forceProductionKVBatchSize bool, qualityOfService sessiondatapb.QoSLevel, ) error { start := timeutil.Now() err := c.fetcher.StartInconsistentScan( - ctx, db, initialTimestamp, maxTimestampAge, spans, batchBytesLimit, limitHint, traceKV, - forceProductionKVBatchSize, qualityOfService, + ctx, db, initialTimestamp, maxTimestampAge, spans, batchBytesLimit, + limitHint, qualityOfService, ) c.startScanStallTime += timeutil.Since(start) return err diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index f121c890bfc1..d91380902ee3 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -148,13 +148,15 @@ func newTableReader( if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ - Reverse: spec.Reverse, - LockStrength: spec.LockingStrength, - LockWaitPolicy: spec.LockingWaitPolicy, - LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, - Alloc: &tr.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, - Spec: &spec.FetchSpec, + Reverse: spec.Reverse, + LockStrength: spec.LockingStrength, + LockWaitPolicy: spec.LockingWaitPolicy, + LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, + Alloc: &tr.alloc, + MemMonitor: flowCtx.EvalCtx.Mon, + Spec: &spec.FetchSpec, + TraceKV: flowCtx.TraceKV, + ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, }, ); err != nil { return nil, err @@ -209,17 +211,13 @@ func (tr *tableReader) startScan(ctx context.Context) error { var err error if tr.maxTimestampAge == 0 { err = tr.fetcher.StartScan( - ctx, tr.FlowCtx.Txn, tr.Spans, nil /* spanIDs */, bytesLimit, - tr.limitHint, tr.FlowCtx.TraceKV, - tr.EvalCtx.TestingKnobs.ForceProductionValues, + ctx, tr.FlowCtx.Txn, tr.Spans, nil /* spanIDs */, bytesLimit, tr.limitHint, ) } else { initialTS := tr.FlowCtx.Txn.ReadTimestamp() err = tr.fetcher.StartInconsistentScan( ctx, tr.FlowCtx.Cfg.DB, initialTS, tr.maxTimestampAge, tr.Spans, - bytesLimit, tr.limitHint, tr.FlowCtx.TraceKV, - tr.EvalCtx.TestingKnobs.ForceProductionValues, - tr.EvalCtx.QualityOfService(), + bytesLimit, tr.limitHint, tr.EvalCtx.QualityOfService(), ) } tr.scanStarted = true diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 78edb539c1ad..df2d86cae951 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -457,12 +457,14 @@ func (z *zigzagJoiner) setupInfo( if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ - LockStrength: spec.LockingStrength, - LockWaitPolicy: spec.LockingWaitPolicy, - LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, - Alloc: &info.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, - Spec: &spec.FetchSpec, + LockStrength: spec.LockingStrength, + LockWaitPolicy: spec.LockingWaitPolicy, + LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, + Alloc: &info.alloc, + MemMonitor: flowCtx.EvalCtx.Mon, + Spec: &spec.FetchSpec, + TraceKV: flowCtx.TraceKV, + ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, }, ); err != nil { return err @@ -650,8 +652,6 @@ func (z *zigzagJoiner) nextRow(ctx context.Context, txn *kv.Txn) (rowenc.EncDatu nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), zigzagJoinerBatchSize, - z.FlowCtx.TraceKV, - z.EvalCtx.TestingKnobs.ForceProductionValues, ) if err != nil { return nil, err @@ -793,8 +793,6 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), zigzagJoinerBatchSize, - z.FlowCtx.TraceKV, - z.EvalCtx.TestingKnobs.ForceProductionValues, ) if err != nil { log.Errorf(z.Ctx, "scan error: %s", err)