From f4f984b37f9ff4bcb5663787626a5386c7c6d323 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 16 Jun 2022 12:30:57 -0700 Subject: [PATCH] sql: clean up the lifecycle of fetchers Previously, the lifecycle of different fetcher objects was a mess. Consider the sequence of fetchers when used by the join reader with the old non-streamer code path: `rowexec.joinReader` -> `row.Fetcher` -> `row.KVFetcher` -> `row.txnKVFetcher`. `row.Fetcher` was initialized once, but then on every call to `StartScan`, we would create a new `row.txnKVFetcher` and then wrap it with a new `row.KVFetcher` (during an internal `StartScanFrom` call). In other words, throughout the lifetime of the join reader, its fetcher would create a new pair of objects for each input row batch. This setup is very unintuitive and previously led to some bugs with memory accounting. I believe such a setup was created organically, without giving too much thought to it. Some considerations should be pointed out: - in some cases, we have some state from the previous fetch that we want to discard - in some cases, we provide the `row.Fetcher` with a custom `KVBatchFetcher` implementation. This commit refactors all of this stuff to make it much more sane. In particular, we now only create a single `row.KVFetcher` object that is powered by a single `row.txnKVFetcher` or `row.txnKVStreamer` implementation throughout the whole lifetime of `row.Fetcher`. In the main code path, the callers are now expected to only use `StartScan` method which correctly discards unnecessary state from the previous call. This is achieved by adding a new method to `KVBatchFetcher` interface. This commit supports the use case with custom `KVBatchFetcher`s too by asking the caller to explicitly specify a knob during the initialization of the `row.Fetcher` - in such case, only `StartScanFrom` calls are allowed. There, we still close the `KVBatchFetcher` from the previous call (tbh I believe this is not necessary since these custom `KVBatchFetcher`s don't have anything to clean up, but it's probably safer to keep the old behavior here). Furthermore, this commit pushes some arguments from `StartScan` into `Init` - most notably the txn is now passed only once. However, there are some use cases (like a column backfill, done in chunks) where the txn might change throughout the lifetime of the fetcher - we allow updating it later if needed. This also allows us to unify the streamer and the non-streamer code paths - to remove some of the duplicated code as well as push the usage of the streamer lower in the stack. Release note: None --- pkg/BUILD.bazel | 1 + .../cdcevent/rowfetcher_cache.go | 5 +- pkg/ccl/cliccl/debug_backup.go | 5 +- pkg/sql/backfill.go | 8 +- pkg/sql/backfill/backfill.go | 23 ++- pkg/sql/colfetcher/BUILD.bazel | 1 - pkg/sql/colfetcher/cfetcher.go | 118 ++++---------- pkg/sql/colfetcher/colbatch_scan.go | 56 ++++--- pkg/sql/colfetcher/index_join.go | 139 +++++++--------- pkg/sql/delete_preserving_index_test.go | 3 +- pkg/sql/delete_range.go | 5 +- pkg/sql/indexbackfiller_test.go | 3 +- pkg/sql/row/BUILD.bazel | 8 + pkg/sql/row/errors.go | 12 +- pkg/sql/row/fetcher.go | 143 ++++++++++------ pkg/sql/row/fetcher_mvcc_test.go | 5 +- pkg/sql/row/fetcher_test.go | 28 ++-- pkg/sql/row/kv_batch_fetcher.go | 124 +++++++------- pkg/sql/row/kv_batch_streamer.go | 76 +++++---- pkg/sql/row/kv_fetcher.go | 123 ++++++++++---- pkg/sql/row/locking.go | 4 +- pkg/sql/rowexec/BUILD.bazel | 1 - pkg/sql/rowexec/indexbackfiller.go | 5 +- pkg/sql/rowexec/inverted_joiner.go | 3 +- pkg/sql/rowexec/joinreader.go | 152 +++++++----------- pkg/sql/rowexec/rowfetcher.go | 2 +- pkg/sql/rowexec/stats.go | 6 +- pkg/sql/rowexec/tablereader.go | 3 +- pkg/sql/rowexec/zigzagjoiner.go | 10 +- 29 files changed, 569 insertions(+), 503 deletions(-) diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 1d404bffb210..f3d074c580dc 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -368,6 +368,7 @@ ALL_TESTS = [ "//pkg/sql/protoreflect:protoreflect_test", "//pkg/sql/querycache:querycache_test", "//pkg/sql/randgen:randgen_test", + "//pkg/sql/row:row_disallowed_imports_test", "//pkg/sql/row:row_test", "//pkg/sql/rowcontainer:rowcontainer_test", "//pkg/sql/rowenc/keyside:keyside_test", diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 018c08cfa877..61a0a01ad293 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -239,8 +239,9 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily( if err := rf.Init( context.TODO(), row.FetcherInitArgs{ - Alloc: &c.a, - Spec: &spec, + WillUseCustomKVFetcher: true, + Alloc: &c.a, + Spec: &spec, }, ); err != nil { return nil, nil, err diff --git a/pkg/ccl/cliccl/debug_backup.go b/pkg/ccl/cliccl/debug_backup.go index c8c4d4300485..b13397f18d06 100644 --- a/pkg/ccl/cliccl/debug_backup.go +++ b/pkg/ccl/cliccl/debug_backup.go @@ -627,8 +627,9 @@ func makeRowFetcher( if err := rf.Init( ctx, row.FetcherInitArgs{ - Alloc: &tree.DatumAlloc{}, - Spec: &spec, + WillUseCustomKVFetcher: true, + Alloc: &tree.DatumAlloc{}, + Spec: &spec, }, ); err != nil { return rf, err diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index a47907b39b30..831e8ea367de 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -2584,7 +2584,7 @@ func columnBackfillInTxn( rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal) var backfiller backfill.ColumnBackfiller if err := backfiller.InitForLocalUse( - ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, traceKV, + ctx, txn, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, traceKV, ); err != nil { return err } @@ -2633,8 +2633,10 @@ func indexBackfillInTxn( sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec) for sp.Key != nil { var err error - sp.Key, err = backfiller.RunIndexBackfillChunk(ctx, - txn, tableDesc, sp, indexTxnBackfillChunkSize, false /* alsoCommit */, traceKV) + sp.Key, err = backfiller.RunIndexBackfillChunk( + ctx, txn, tableDesc, sp, indexTxnBackfillChunkSize, + false /* alsoCommit */, traceKV, + ) if err != nil { return err } diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index acdb447290c4..320c01ce1602 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -105,7 +105,10 @@ func (cb *ColumnBackfiller) initCols(desc catalog.TableDescriptor) { // init performs initialization operations that are shared across the local // and distributed initialization procedures for the ColumnBackfiller. +// +// txn might be nil, in which case it will need to be set on the fetcher later. func (cb *ColumnBackfiller) init( + txn *kv.Txn, evalCtx *eval.Context, defaultExprs []tree.TypedExpr, computedExprs []tree.TypedExpr, @@ -154,6 +157,7 @@ func (cb *ColumnBackfiller) init( return cb.fetcher.Init( evalCtx.Context, row.FetcherInitArgs{ + Txn: txn, Alloc: &cb.alloc, MemMonitor: cb.mon, Spec: &spec, @@ -167,6 +171,7 @@ func (cb *ColumnBackfiller) init( // is occurring on the gateway as part of the user's transaction. func (cb *ColumnBackfiller) InitForLocalUse( ctx context.Context, + txn *kv.Txn, evalCtx *eval.Context, semaCtx *tree.SemaContext, desc catalog.TableDescriptor, @@ -193,7 +198,7 @@ func (cb *ColumnBackfiller) InitForLocalUse( if err != nil { return err } - return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, traceKV) + return cb.init(txn, evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, traceKV) } // InitForDistributedUse initializes a ColumnBackfiller for use as part of a @@ -250,7 +255,8 @@ func (cb *ColumnBackfiller) InitForDistributedUse( flowCtx.Descriptors.ReleaseAll(ctx) rowMetrics := flowCtx.GetRowMetrics() - return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, flowCtx.TraceKV) + // The txn will be set on the fetcher in RunColumnBackfillChunk. + return cb.init(nil /* txn */, evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, flowCtx.TraceKV) } // Close frees the resources used by the ColumnBackfiller. @@ -302,6 +308,9 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( panic("only column data should be modified, but the rowUpdater is configured otherwise") } + // Update the fetcher to use the new txn. + cb.fetcher.SetTxn(txn) + // Get the next set of rows. // // Running the scan and applying the changes in many transactions @@ -311,7 +320,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( // populated and deleted by the OLTP commands but not otherwise // read or used if err := cb.fetcher.StartScan( - ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */ + ctx, []roachpb.Span{sp}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), chunkSize, ); err != nil { @@ -802,6 +811,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( if err := fetcher.Init( ib.evalCtx.Context, row.FetcherInitArgs{ + Txn: txn, Alloc: &ib.alloc, MemMonitor: ib.mon, Spec: &spec, @@ -812,7 +822,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( } defer fetcher.Close(ctx) if err := fetcher.StartScan( - ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */ + ctx, []roachpb.Span{sp}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), initBufferSize, ); err != nil { @@ -976,8 +986,9 @@ func (ib *IndexBackfiller) RunIndexBackfillChunk( alsoCommit bool, traceKV bool, ) (roachpb.Key, error) { - entries, key, memUsedBuildingChunk, err := ib.BuildIndexEntriesChunk(ctx, txn, tableDesc, sp, - chunkSize, traceKV) + entries, key, memUsedBuildingChunk, err := ib.BuildIndexEntriesChunk( + ctx, txn, tableDesc, sp, chunkSize, traceKV, + ) if err != nil { return nil, err } diff --git a/pkg/sql/colfetcher/BUILD.bazel b/pkg/sql/colfetcher/BUILD.bazel index c21980b85b25..77aea1ce5e14 100644 --- a/pkg/sql/colfetcher/BUILD.bazel +++ b/pkg/sql/colfetcher/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "//pkg/col/typeconv", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvclient/kvstreamer", "//pkg/roachpb", "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", diff --git a/pkg/sql/colfetcher/cfetcher.go b/pkg/sql/colfetcher/cfetcher.go index ff77deb14007..24497bcf7649 100644 --- a/pkg/sql/colfetcher/cfetcher.go +++ b/pkg/sql/colfetcher/cfetcher.go @@ -17,13 +17,10 @@ import ( "sort" "strings" "sync" - "time" "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" @@ -44,7 +41,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" ) @@ -164,28 +160,18 @@ func (m colIdxMap) Swap(i, j int) { } type cFetcherArgs struct { - // lockStrength represents the row-level locking mode to use when fetching - // rows. - lockStrength descpb.ScanLockingStrength - // lockWaitPolicy represents the policy to be used for handling conflicting - // locks held by other active transactions. - lockWaitPolicy descpb.ScanLockingWaitPolicy - // lockTimeout specifies the maximum amount of time that the fetcher will - // wait while attempting to acquire a lock on a key or while blocking on an - // existing lock in order to perform a non-locking read on a key. - lockTimeout time.Duration // memoryLimit determines the maximum memory footprint of the output batch. memoryLimit int64 // estimatedRowCount is the optimizer-derived number of expected rows that // this fetch will produce, if non-zero. estimatedRowCount uint64 - // reverse denotes whether or not the spans should be read in reverse or not - // when StartScan is invoked. - reverse bool // traceKV indicates whether or not session tracing is enabled. It is set // when initializing the fetcher. - traceKV bool - forceProductionKVBatchSize bool + traceKV bool + // singleUse, if true, indicates that the cFetcher will only need to scan a + // single set of spans. This allows the cFetcher to close itself eagerly, + // once it finishes the first fetch. + singleUse bool } // noOutputColumn is a sentinel value to denote that a system column is not @@ -229,7 +215,7 @@ type cFetcher struct { fetcher *row.KVFetcher // bytesRead stores the cumulative number of bytes read by this cFetcher // throughout its whole existence (i.e. between its construction and - // Release()). It accumulates the bytes read statistic across StartScan* and + // Release()). It accumulates the bytes read statistic across StartScan and // Close methods. // // The field should not be accessed directly by the users of the cFetcher - @@ -286,10 +272,6 @@ type cFetcher struct { accountingHelper colmem.SetAccountingHelper - // kvFetcherMemAcc is a memory account that will be used by the underlying - // KV fetcher. - kvFetcherMemAcc *mon.BoundAccount - // maxCapacity if non-zero indicates the target capacity of the output // batch. It is set when at the row finalization we realize that the output // batch has exceeded the memory limit. @@ -343,15 +325,14 @@ func (cf *cFetcher) resetBatch() { } } -// Init sets up a Fetcher based on the table args. Only columns present in +// Init sets up the cFetcher based on the table args. Only columns present in // tableArgs.cols will be fetched. func (cf *cFetcher) Init( - allocator *colmem.Allocator, kvFetcherMemAcc *mon.BoundAccount, tableArgs *cFetcherTableArgs, + allocator *colmem.Allocator, kvFetcher *row.KVFetcher, tableArgs *cFetcherTableArgs, ) error { if tableArgs.spec.Version != descpb.IndexFetchSpecVersionInitial { return errors.Newf("unsupported IndexFetchSpec version %d", tableArgs.spec.Version) } - cf.kvFetcherMemAcc = kvFetcherMemAcc table := newCTableInfo() nCols := tableArgs.ColIdxMap.Len() if cap(table.orderedColIdxMap.vals) < nCols { @@ -481,33 +462,22 @@ func (cf *cFetcher) Init( } cf.table = table + cf.fetcher = kvFetcher cf.accountingHelper.Init(allocator, cf.table.typs) return nil } -//gcassert:inline -func (cf *cFetcher) setFetcher(f *row.KVFetcher, limitHint rowinfra.RowLimit) { - cf.fetcher = f - cf.machine.lastRowPrefix = nil - cf.machine.limitHint = int(limitHint) - cf.machine.state[0] = stateResetBatch - cf.machine.state[1] = stateInitFetch -} - -// StartScan initializes and starts the key-value scan. Can be used multiple -// times. +// StartScan initializes and starts the key-value scan. Can only be used +// multiple times if cFetcherArgs.singleUse was set to false in Init(). // // The fetcher takes ownership of the spans slice - it can modify the slice and // will perform the memory accounting accordingly. The caller can only reuse the -// spans slice after the fetcher has been closed (which happens when the fetcher -// emits the first zero batch), and if the caller does, it becomes responsible -// for the memory accounting. +// spans slice after the fetcher emits a zero-length batch, and if the caller +// does, it becomes responsible for the memory accounting. func (cf *cFetcher) StartScan( ctx context.Context, - txn *kv.Txn, spans roachpb.Spans, - bsHeader *roachpb.BoundedStalenessHeader, limitBatches bool, batchBytesLimit rowinfra.BytesLimit, limitHint rowinfra.RowLimit, @@ -544,49 +514,13 @@ func (cf *cFetcher) StartScan( firstBatchLimit = rowinfra.KeyLimit(int(limitHint) * int(cf.table.spec.MaxKeysPerRow)) } - f, err := row.NewKVFetcher( - ctx, - txn, - spans, - nil, /* spanIDs */ - bsHeader, - cf.reverse, - batchBytesLimit, - firstBatchLimit, - cf.lockStrength, - cf.lockWaitPolicy, - cf.lockTimeout, - cf.kvFetcherMemAcc, - cf.forceProductionKVBatchSize, + cf.machine.lastRowPrefix = nil + cf.machine.limitHint = int(limitHint) + cf.machine.state[0] = stateResetBatch + cf.machine.state[1] = stateInitFetch + return cf.fetcher.SetupNextFetch( + ctx, spans, nil /* spanIDs */, batchBytesLimit, firstBatchLimit, ) - if err != nil { - return err - } - cf.setFetcher(f, limitHint) - return nil -} - -// StartScanStreaming initializes and starts the key-value scan using the -// Streamer API. Can be used multiple times. -// -// The fetcher takes ownership of the spans slice - it can modify the slice and -// will perform the memory accounting accordingly. The caller can only reuse the -// spans slice after the fetcher has been closed (which happens when the fetcher -// emits the first zero batch), and if the caller does, it becomes responsible -// for the memory accounting. -func (cf *cFetcher) StartScanStreaming( - ctx context.Context, - streamer *kvstreamer.Streamer, - spans roachpb.Spans, - limitHint rowinfra.RowLimit, -) error { - kvBatchFetcher, err := row.NewTxnKVStreamer(ctx, streamer, spans, nil /* spanIDs */, cf.lockStrength) - if err != nil { - return err - } - f := row.NewKVStreamingFetcher(kvBatchFetcher) - cf.setFetcher(f, limitHint) - return nil } // fetcherState is the state enum for NextBatch. @@ -953,13 +887,17 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) { case stateEmitLastBatch: cf.machine.state[0] = stateFinished cf.finalizeBatch() - // Close the fetcher eagerly so that its memory could be GCed. - cf.Close(ctx) + if cf.singleUse { + // Close the fetcher eagerly so that its memory could be GCed. + cf.Close(ctx) + } return cf.machine.batch, nil case stateFinished: - // Close the fetcher eagerly so that its memory could be GCed. - cf.Close(ctx) + if cf.singleUse { + // Close the fetcher eagerly so that its memory could be GCed. + cf.Close(ctx) + } return coldata.ZeroBatch, nil } } @@ -1346,7 +1284,7 @@ func (cf *cFetcher) convertFetchError(ctx context.Context, err error) error { // getBytesRead returns the number of bytes read by the cFetcher throughout its // existence so far. This number accumulates the bytes read statistic across -// StartScan* and Close methods. +// StartScan and Close methods. func (cf *cFetcher) getBytesRead() int64 { if cf.fetcher != nil { cf.bytesRead += cf.fetcher.ResetBytesRead() diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index a9829c1987fe..bd8d122f639f 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/execstats" + "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -92,9 +93,7 @@ func (s *ColBatchScan) Init(ctx context.Context) { limitBatches := !s.parallelize if err := s.cf.StartScan( s.Ctx, - s.flowCtx.Txn, s.Spans, - s.bsHeader, limitBatches, s.batchBytesLimit, s.limitHint, @@ -188,46 +187,55 @@ func NewColBatchScan( if nodeID, ok := flowCtx.NodeID.OptionalNodeID(); nodeID == 0 && ok { return nil, errors.Errorf("attempting to create a ColBatchScan with uninitialized NodeID") } + var bsHeader *roachpb.BoundedStalenessHeader + if aost := flowCtx.EvalCtx.AsOfSystemTime; aost != nil && aost.BoundedStaleness { + ts := aost.Timestamp + // If the descriptor's modification time is after the bounded staleness min bound, + // we have to increase the min bound. + // Otherwise, we would have table data which would not correspond to the correct + // schema. + if aost.Timestamp.Less(spec.TableDescriptorModificationTime) { + ts = spec.TableDescriptorModificationTime + } + bsHeader = &roachpb.BoundedStalenessHeader{ + MinTimestampBound: ts, + MinTimestampBoundStrict: aost.NearestOnly, + MaxTimestampBound: flowCtx.EvalCtx.AsOfSystemTime.MaxTimestampBound, // may be empty + } + } + limitHint := rowinfra.RowLimit(execinfra.LimitHint(spec.LimitHint, post)) tableArgs, err := populateTableArgs(ctx, flowCtx, &spec.FetchSpec) if err != nil { return nil, err } - fetcher := cFetcherPool.Get().(*cFetcher) - fetcher.cFetcherArgs = cFetcherArgs{ + kvFetcher := row.NewKVFetcher( + flowCtx.Txn, + bsHeader, + spec.Reverse, spec.LockingStrength, spec.LockingWaitPolicy, flowCtx.EvalCtx.SessionData().LockTimeout, + kvFetcherMemAcc, + flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + ) + + fetcher := cFetcherPool.Get().(*cFetcher) + fetcher.cFetcherArgs = cFetcherArgs{ execinfra.GetWorkMemLimit(flowCtx), estimatedRowCount, - spec.Reverse, flowCtx.TraceKV, - flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + true, /* singleUse */ } - if err = fetcher.Init(allocator, kvFetcherMemAcc, tableArgs); err != nil { + if err = fetcher.Init( + allocator, kvFetcher, tableArgs, + ); err != nil { fetcher.Release() return nil, err } - var bsHeader *roachpb.BoundedStalenessHeader - if aost := flowCtx.EvalCtx.AsOfSystemTime; aost != nil && aost.BoundedStaleness { - ts := aost.Timestamp - // If the descriptor's modification time is after the bounded staleness min bound, - // we have to increase the min bound. - // Otherwise, we would have table data which would not correspond to the correct - // schema. - if aost.Timestamp.Less(spec.TableDescriptorModificationTime) { - ts = spec.TableDescriptorModificationTime - } - bsHeader = &roachpb.BoundedStalenessHeader{ - MinTimestampBound: ts, - MinTimestampBoundStrict: aost.NearestOnly, - MaxTimestampBound: flowCtx.EvalCtx.AsOfSystemTime.MaxTimestampBound, // may be empty - } - } - s := colBatchScanPool.Get().(*ColBatchScan) s.Spans = spec.Spans if !flowCtx.Local { diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index bc79ba040001..0b94d2b5b3ef 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/col/typeconv" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecspan" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" @@ -125,12 +124,6 @@ type ColIndexJoin struct { // usesStreamer indicates whether the ColIndexJoin is using the Streamer // API. usesStreamer bool - streamerInfo struct { - *kvstreamer.Streamer - budgetAcc *mon.BoundAccount - budgetLimit int64 - diskBuffer kvstreamer.ResultDiskBuffer - } } var _ colexecop.KVReader = &ColIndexJoin{} @@ -148,30 +141,6 @@ func (s *ColIndexJoin) Init(ctx context.Context) { // tracing is enabled. s.Ctx, s.tracingSpan = execinfra.ProcessorSpan(s.Ctx, "colindexjoin") s.Input.Init(s.Ctx) - if s.usesStreamer { - s.streamerInfo.Streamer = kvstreamer.NewStreamer( - s.flowCtx.Cfg.DistSender, - s.flowCtx.Stopper(), - s.txn, - s.flowCtx.EvalCtx.Settings, - row.GetWaitPolicy(s.cf.lockWaitPolicy), - s.streamerInfo.budgetLimit, - s.streamerInfo.budgetAcc, - ) - mode := kvstreamer.OutOfOrder - if s.maintainOrdering { - mode = kvstreamer.InOrder - } - s.streamerInfo.Streamer.Init( - mode, - kvstreamer.Hints{ - UniqueRequests: true, - SingleRowLookup: true, - }, - int(s.cf.table.spec.MaxKeysPerRow), - s.streamerInfo.diskBuffer, - ) - } } type indexJoinState uint8 @@ -243,26 +212,13 @@ func (s *ColIndexJoin) Next() coldata.Batch { // the memory accounting - we don't double count for any memory of // spans because the spanAssembler released all of the relevant // memory from its account in GetSpans(). - var err error - if s.usesStreamer { - err = s.cf.StartScanStreaming( - s.Ctx, - s.streamerInfo.Streamer, - spans, - rowinfra.NoRowLimit, - ) - } else { - err = s.cf.StartScan( - s.Ctx, - s.txn, - spans, - nil, /* bsHeader */ - false, /* limitBatches */ - rowinfra.NoBytesLimit, - rowinfra.NoRowLimit, - ) - } - if err != nil { + if err := s.cf.StartScan( + s.Ctx, + spans, + false, /* limitBatches */ + rowinfra.NoBytesLimit, + rowinfra.NoRowLimit, + ); err != nil { colexecerror.InternalError(err) } s.state = indexJoinScanning @@ -521,8 +477,8 @@ func NewColIndexJoin( totalMemoryLimit := execinfra.GetWorkMemLimit(flowCtx) cFetcherMemoryLimit := totalMemoryLimit - var streamerBudgetLimit int64 + var kvFetcher *row.KVFetcher useStreamer := flowCtx.Txn != nil && flowCtx.Txn.Type() == kv.LeafTxn && flowCtx.MakeLeafTxn != nil && row.CanUseStreamer(ctx, flowCtx.EvalCtx.Settings) txn := flowCtx.Txn @@ -530,31 +486,57 @@ func NewColIndexJoin( if streamerBudgetAcc == nil { return nil, errors.AssertionFailedf("streamer budget account is nil when the Streamer API is desired") } + if spec.MaintainOrdering && diskMonitor == nil { + return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") + } // Keep 1/8th of the memory limit for the output batch of the cFetcher, // and we'll give the remaining memory to the streamer budget below. cFetcherMemoryLimit = int64(math.Ceil(float64(totalMemoryLimit) / 8.0)) - streamerBudgetLimit = 7 * cFetcherMemoryLimit + streamerBudgetLimit := 7 * cFetcherMemoryLimit txn, err = flowCtx.MakeLeafTxn() if err != nil { return nil, err } + kvFetcher = row.NewStreamingKVFetcher( + flowCtx.Cfg.DistSender, + flowCtx.Stopper(), + txn, + flowCtx.EvalCtx.Settings, + spec.LockingWaitPolicy, + spec.LockingStrength, + streamerBudgetLimit, + streamerBudgetAcc, + spec.MaintainOrdering, + true, /* singleRowLookup */ + int(spec.FetchSpec.MaxKeysPerRow), + rowcontainer.NewKVStreamerResultDiskBuffer( + flowCtx.Cfg.TempStorage, diskMonitor, + ), + ) + } else { + kvFetcher = row.NewKVFetcher( + txn, + nil, /* bsHeader */ + false, /* reverse */ + spec.LockingStrength, + spec.LockingWaitPolicy, + flowCtx.EvalCtx.SessionData().LockTimeout, + kvFetcherMemAcc, + flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + ) } fetcher := cFetcherPool.Get().(*cFetcher) fetcher.cFetcherArgs = cFetcherArgs{ - spec.LockingStrength, - spec.LockingWaitPolicy, - flowCtx.EvalCtx.SessionData().LockTimeout, cFetcherMemoryLimit, // Note that the correct estimated row count will be set by the index // joiner for each set of spans to read. - 0, /* estimatedRowCount */ - false, /* reverse */ + 0, /* estimatedRowCount */ flowCtx.TraceKV, - flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + false, /* singleUse */ } if err = fetcher.Init( - fetcherAllocator, kvFetcherMemAcc, tableArgs, + fetcherAllocator, kvFetcher, tableArgs, ); err != nil { fetcher.Release() return nil, err @@ -579,28 +561,18 @@ func NewColIndexJoin( useStreamer, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, ) op.prepareMemLimit(inputTypes) - if useStreamer { - op.streamerInfo.budgetLimit = streamerBudgetLimit - op.streamerInfo.budgetAcc = streamerBudgetAcc - if spec.MaintainOrdering && diskMonitor == nil { - return nil, errors.AssertionFailedf("diskMonitor is nil when ordering needs to be maintained") - } - op.streamerInfo.diskBuffer = rowcontainer.NewKVStreamerResultDiskBuffer( - flowCtx.Cfg.TempStorage, diskMonitor, - ) - if cFetcherMemoryLimit < op.mem.inputBatchSizeLimit { - // If we have a low workmem limit, then we want to reduce the input - // batch size limit. - // - // The Streamer gets most of workmem as its budget which accounts - // for two usages - for the footprint of the spans themselves in the - // enqueued requests as well as the footprint of the responses - // received by the Streamer. If we don't reduce the input batch size - // limit here, then 8MiB value will be used, and the constructed - // spans (i.e. the enqueued requests) alone might exceed the budget - // leading to the Streamer erroring out in Enqueue(). - op.mem.inputBatchSizeLimit = cFetcherMemoryLimit - } + if useStreamer && cFetcherMemoryLimit < op.mem.inputBatchSizeLimit { + // If we have a low workmem limit, then we want to reduce the input + // batch size limit. + // + // The Streamer gets most of workmem as its budget which accounts for + // two usages - for the footprint of the spans themselves in the + // enqueued requests as well as the footprint of the responses received + // by the Streamer. If we don't reduce the input batch size limit here, + // then 8MiB value will be used, and the constructed spans (i.e. the + // enqueued requests) alone might exceed the budget leading to the + // Streamer erroring out in Enqueue(). + op.mem.inputBatchSizeLimit = cFetcherMemoryLimit } return op, nil @@ -688,8 +660,5 @@ func (s *ColIndexJoin) closeInternal() { // spanAssembler can be nil if Release() has already been called. s.spanAssembler.Close() } - if s.streamerInfo.Streamer != nil { - s.streamerInfo.Streamer.Close(ctx) - } s.batch = nil } diff --git a/pkg/sql/delete_preserving_index_test.go b/pkg/sql/delete_preserving_index_test.go index 73ba6b4ef386..9dadb18178d1 100644 --- a/pkg/sql/delete_preserving_index_test.go +++ b/pkg/sql/delete_preserving_index_test.go @@ -778,6 +778,7 @@ func fetchIndex( require.NoError(t, fetcher.Init( ctx, row.FetcherInitArgs{ + Txn: txn, Reverse: reverse, Alloc: &alloc, MemMonitor: mm.Monitor(), @@ -787,7 +788,7 @@ func fetchIndex( )) require.NoError(t, fetcher.StartScan( - ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, + ctx, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, )) var rows []tree.Datums for { diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index 07fa1b28f292..0ecf74b7e707 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -99,8 +99,9 @@ func (d *deleteRangeNode) startExec(params runParams) error { if err := d.fetcher.Init( params.ctx, row.FetcherInitArgs{ - Alloc: params.p.alloc, - Spec: &spec, + WillUseCustomKVFetcher: true, + Alloc: params.p.alloc, + Spec: &spec, }, ); err != nil { return err diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index 447506e4d618..3aaab0595104 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -409,6 +409,7 @@ INSERT INTO foo VALUES (1), (10), (100); require.NoError(t, fetcher.Init( ctx, row.FetcherInitArgs{ + Txn: txn, Alloc: &alloc, MemMonitor: mm.Monitor(), Spec: &spec, @@ -417,7 +418,7 @@ INSERT INTO foo VALUES (1), (10), (100); )) require.NoError(t, fetcher.StartScan( - ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, + ctx, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, )) var rows []tree.Datums for { diff --git a/pkg/sql/row/BUILD.bazel b/pkg/sql/row/BUILD.bazel index a83596101861..58134200e7b9 100644 --- a/pkg/sql/row/BUILD.bazel +++ b/pkg/sql/row/BUILD.bazel @@ -1,4 +1,5 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") +load("//pkg/testutils/buildutil:buildutil.bzl", "disallowed_imports_test") go_library( name = "row", @@ -27,6 +28,7 @@ go_library( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvstreamer", "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", @@ -69,6 +71,7 @@ go_library( "//pkg/util/log/eventpb", "//pkg/util/mon", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/timeutil", "//pkg/util/unique", "//pkg/util/uuid", @@ -124,3 +127,8 @@ go_test( "@com_github_stretchr_testify//require", ], ) + +disallowed_imports_test( + "row", + ["//pkg/sql/execinfra"], +) diff --git a/pkg/sql/row/errors.go b/pkg/sql/row/errors.go index c798e325b145..c8389dc1eb77 100644 --- a/pkg/sql/row/errors.go +++ b/pkg/sql/row/errors.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -280,8 +281,9 @@ func DecodeRowInfo( if err := rf.Init( ctx, FetcherInitArgs{ - Alloc: &tree.DatumAlloc{}, - Spec: &spec, + WillUseCustomKVFetcher: true, + Alloc: &tree.DatumAlloc{}, + Spec: &spec, }, ); err != nil { return nil, nil, nil, err @@ -316,4 +318,10 @@ func DecodeRowInfo( return index, names, values, nil } +func (f *singleKVFetcher) SetupNextFetch( + context.Context, roachpb.Spans, []int, rowinfra.BytesLimit, rowinfra.KeyLimit, +) error { + return nil +} + func (f *singleKVFetcher) close(context.Context) {} diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index abaf8f37e9c3..fecc63b81858 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -76,6 +76,15 @@ type kvBatchFetcherResponse struct { // KVBatchFetcher abstracts the logic of fetching KVs in batches. type KVBatchFetcher interface { + // SetupNextFetch prepares the fetch of the next set of spans. + SetupNextFetch( + ctx context.Context, + spans roachpb.Spans, + spanIDs []int, + batchBytesLimit rowinfra.BytesLimit, + firstBatchKeyLimit rowinfra.KeyLimit, + ) error + // nextBatch returns the next batch of rows. See kvBatchFetcherResponse for // details on what is returned. nextBatch(ctx context.Context) (kvBatchFetcherResponse, error) @@ -209,6 +218,17 @@ func (rf *Fetcher) Close(ctx context.Context) { // FetcherInitArgs contains arguments for Fetcher.Init. type FetcherInitArgs struct { + // StreamingKVFetcher, if non-nil, contains the KVFetcher that uses the + // kvstreamer.Streamer API under the hood. The caller is then expected to + // use only StartScan() method. + StreamingKVFetcher *KVFetcher + // WillUseCustomKVFetcher, if true, indicates that the caller will only use + // StartScanFrom() method and will be providing its own KVFetcher. + WillUseCustomKVFetcher bool + // Txn is the txn for the fetch. It might be nil, and the caller is expected + // to either provide the txn later via SetTxn() or to only use StartScanFrom + // method. + Txn *kv.Txn // Reverse denotes whether or not the spans should be read in reverse or not // when StartScan* methods are invoked. Reverse bool @@ -339,11 +359,49 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { } } + if args.StreamingKVFetcher != nil { + if args.WillUseCustomKVFetcher { + return errors.AssertionFailedf( + "StreamingKVFetcher is non-nil when WillUseCustomKVFetcher is true", + ) + } + rf.kvFetcher = args.StreamingKVFetcher + } else if !args.WillUseCustomKVFetcher { + fetcherArgs := kvBatchFetcherArgs{ + reverse: args.Reverse, + lockStrength: args.LockStrength, + lockWaitPolicy: args.LockWaitPolicy, + lockTimeout: args.LockTimeout, + acc: rf.kvFetcherMemAcc, + forceProductionKVBatchSize: args.ForceProductionKVBatchSize, + } + if args.Txn != nil { + fetcherArgs.sendFn = makeKVBatchFetcherDefaultSendFunc(args.Txn) + fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader() + fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ + } + rf.kvFetcher = newKVFetcher(newKVBatchFetcher(fetcherArgs)) + } + return nil } +// SetTxn updates the Fetcher to use the provided txn. +func (rf *Fetcher) SetTxn(txn *kv.Txn) { + rf.setTxnAndSendFn(txn, makeKVBatchFetcherDefaultSendFunc(txn)) +} + +// setTxnAndSendFn peeks inside of the KVFetcher to update the underlying +// txnKVFetcher with the new txn and sendFn. +func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) { + f := rf.kvFetcher.KVBatchFetcher.(*txnKVFetcher) + f.sendFn = sendFn + f.requestAdmissionHeader = txn.AdmissionHeader() + f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ +} + // StartScan initializes and starts the key-value scan. Can be used multiple -// times. +// times. Cannot be used if WillUseCustomKVFetcher was set to true in Init(). // // The fetcher takes ownership of the spans slice - it can modify the slice and // will perform the memory accounting accordingly (if Init() was called with @@ -377,40 +435,29 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { // argument that some number of rows will eventually satisfy the query and we // likely don't need to scan `spans` fully. The bytes limit, on the other hand, // is simply intended to protect against OOMs. +// +// Batch limits can only be used if the spans are ordered. func (rf *Fetcher) StartScan( ctx context.Context, - txn *kv.Txn, spans roachpb.Spans, spanIDs []int, batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, ) error { + if rf.args.WillUseCustomKVFetcher { + return errors.AssertionFailedf("StartScan is called instead of StartScanFrom") + } if len(spans) == 0 { return errors.AssertionFailedf("no spans") } - f, err := makeKVBatchFetcher( - ctx, - kvBatchFetcherArgs{ - sendFn: makeKVBatchFetcherDefaultSendFunc(txn), - spans: spans, - spanIDs: spanIDs, - reverse: rf.args.Reverse, - batchBytesLimit: batchBytesLimit, - firstBatchKeyLimit: rf.rowLimitToKeyLimit(rowLimitHint), - lockStrength: rf.args.LockStrength, - lockWaitPolicy: rf.args.LockWaitPolicy, - lockTimeout: rf.args.LockTimeout, - acc: rf.kvFetcherMemAcc, - forceProductionKVBatchSize: rf.args.ForceProductionKVBatchSize, - requestAdmissionHeader: txn.AdmissionHeader(), - responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, - }, - ) - if err != nil { + if err := rf.kvFetcher.SetupNextFetch( + ctx, spans, spanIDs, batchBytesLimit, rf.rowLimitToKeyLimit(rowLimitHint), + ); err != nil { return err } - return rf.StartScanFrom(ctx, &f) + + return rf.startScan(ctx) } // TestingInconsistentScanSleep introduces a sleep inside the fetcher after @@ -428,7 +475,10 @@ var TestingInconsistentScanSleep time.Duration // that has passed. See the documentation for TableReaderSpec for more // details. // -// Can be used multiple times. +// Can be used multiple times. Cannot be used if WillUseCustomKVFetcher was set +// to true in Init(). +// +// Batch limits can only be used if the spans are ordered. func (rf *Fetcher) StartInconsistentScan( ctx context.Context, db *kv.DB, @@ -439,6 +489,12 @@ func (rf *Fetcher) StartInconsistentScan( rowLimitHint rowinfra.RowLimit, qualityOfService sessiondatapb.QoSLevel, ) error { + if rf.args.StreamingKVFetcher != nil { + return errors.AssertionFailedf("StartInconsistentScan is called instead of StartScanFrom") + } + if rf.args.WillUseCustomKVFetcher { + return errors.AssertionFailedf("StartInconsistentScan is called instead of StartScanFrom") + } if len(spans) == 0 { return errors.AssertionFailedf("no spans") } @@ -491,28 +547,15 @@ func (rf *Fetcher) StartInconsistentScan( // TODO(radu): we should commit the last txn. Right now the commit is a no-op // on read transactions, but perhaps one day it will release some resources. - f, err := makeKVBatchFetcher( - ctx, - kvBatchFetcherArgs{ - sendFn: sendFn, - spans: spans, - spanIDs: nil, - reverse: rf.args.Reverse, - batchBytesLimit: batchBytesLimit, - firstBatchKeyLimit: rf.rowLimitToKeyLimit(rowLimitHint), - lockStrength: rf.args.LockStrength, - lockWaitPolicy: rf.args.LockWaitPolicy, - lockTimeout: rf.args.LockTimeout, - acc: rf.kvFetcherMemAcc, - forceProductionKVBatchSize: rf.args.ForceProductionKVBatchSize, - requestAdmissionHeader: txn.AdmissionHeader(), - responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, - }, - ) - if err != nil { + rf.setTxnAndSendFn(txn, sendFn) + + if err := rf.kvFetcher.SetupNextFetch( + ctx, spans, nil /* spanIDs */, batchBytesLimit, rf.rowLimitToKeyLimit(rowLimitHint), + ); err != nil { return err } - return rf.StartScanFrom(ctx, &f) + + return rf.startScan(ctx) } func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.KeyLimit { @@ -530,14 +573,22 @@ func (rf *Fetcher) rowLimitToKeyLimit(rowLimitHint rowinfra.RowLimit) rowinfra.K return rowinfra.KeyLimit(int64(rowLimitHint)*int64(rf.table.spec.MaxKeysPerRow) + 1) } -// StartScanFrom initializes and starts a scan from the given KVBatchFetcher. Can be -// used multiple times. +// StartScanFrom initializes and starts a scan from the given KVBatchFetcher. +// Can be used multiple times. Cannot be used if WillUseCustomKVFetcher was set +// to false in Init(). func (rf *Fetcher) StartScanFrom(ctx context.Context, f KVBatchFetcher) error { - rf.indexKey = nil + if !rf.args.WillUseCustomKVFetcher { + return errors.AssertionFailedf("StartScanFrom is called instead of StartScan") + } if rf.kvFetcher != nil { rf.kvFetcher.Close(ctx) } rf.kvFetcher = newKVFetcher(f) + return rf.startScan(ctx) +} + +func (rf *Fetcher) startScan(ctx context.Context) error { + rf.indexKey = nil rf.kvEnd = false // Retrieve the first key. var err error diff --git a/pkg/sql/row/fetcher_mvcc_test.go b/pkg/sql/row/fetcher_mvcc_test.go index e1dcb6826525..26735d7ac3c2 100644 --- a/pkg/sql/row/fetcher_mvcc_test.go +++ b/pkg/sql/row/fetcher_mvcc_test.go @@ -97,8 +97,9 @@ func TestRowFetcherMVCCMetadata(t *testing.T) { if err := rf.Init( ctx, row.FetcherInitArgs{ - Alloc: &tree.DatumAlloc{}, - Spec: &spec, + WillUseCustomKVFetcher: true, + Alloc: &tree.DatumAlloc{}, + Spec: &spec, }, ); err != nil { t.Fatal(err) diff --git a/pkg/sql/row/fetcher_test.go b/pkg/sql/row/fetcher_test.go index 4e6de73c1405..2de241c2982b 100644 --- a/pkg/sql/row/fetcher_test.go +++ b/pkg/sql/row/fetcher_test.go @@ -62,6 +62,7 @@ func makeIndexFetchSpec(t *testing.T, entry initFetcherArgs) descpb.IndexFetchSp func initFetcher( t *testing.T, + txn *kv.Txn, entry initFetcherArgs, reverseScan bool, alloc *tree.DatumAlloc, @@ -74,6 +75,7 @@ func initFetcher( if err := fetcher.Init( context.Background(), FetcherInitArgs{ + Txn: txn, Reverse: reverseScan, Alloc: alloc, MemMonitor: memMon, @@ -147,11 +149,11 @@ func TestNextRowSingle(t *testing.T) { indexIdx: 0, } - rf := initFetcher(t, args, false /*reverseScan*/, alloc, nil /* memMon */) + txn := kv.NewTxn(ctx, kvDB, 0) + rf := initFetcher(t, txn, args, false /*reverseScan*/, alloc, nil /* memMon */) if err := rf.StartScan( context.Background(), - kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, nil, /* spanIDs */ rowinfra.NoBytesLimit, @@ -250,11 +252,11 @@ func TestNextRowBatchLimiting(t *testing.T) { indexIdx: 0, } - rf := initFetcher(t, args, false /*reverseScan*/, alloc, nil /*memMon*/) + txn := kv.NewTxn(ctx, kvDB, 0) + rf := initFetcher(t, txn, args, false /*reverseScan*/, alloc, nil /*memMon*/) if err := rf.StartScan( context.Background(), - kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */), @@ -342,12 +344,12 @@ func TestRowFetcherMemoryLimits(t *testing.T) { memMon := mon.NewMonitor("test", mon.MemoryResource, nil, nil, -1, 1000, settings) memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1<<20)) defer memMon.Stop(ctx) - rf := initFetcher(t, args, false /*reverseScan*/, alloc, memMon) + txn := kv.NewTxn(ctx, kvDB, 0) + rf := initFetcher(t, txn, args, false /*reverseScan*/, alloc, memMon) defer rf.Close(ctx) err := rf.StartScan( context.Background(), - kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.GetPrimaryIndexID())}, nil, /* spanIDs */ rowinfra.NoBytesLimit, @@ -401,7 +403,8 @@ INDEX(c) indexIdx: 0, } - rf := initFetcher(t, args, false /*reverseScan*/, alloc, nil /*memMon*/) + txn := kv.NewTxn(ctx, kvDB, 0) + rf := initFetcher(t, txn, args, false /*reverseScan*/, alloc, nil /*memMon*/) // Start a scan that has multiple input spans, to tickle the codepath that // sees an "empty batch". When we have multiple input spans, the kv server @@ -421,7 +424,6 @@ INDEX(c) if err := rf.StartScan( context.Background(), - kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{indexSpan, roachpb.Span{Key: midKey, EndKey: endKey}, }, @@ -576,11 +578,11 @@ func TestNextRowSecondaryIndex(t *testing.T) { args.columns = []int{0, 1, 2, 3} } - rf := initFetcher(t, args, false /*reverseScan*/, alloc, nil /*memMon*/) + txn := kv.NewTxn(ctx, kvDB, 0) + rf := initFetcher(t, txn, args, false /*reverseScan*/, alloc, nil /*memMon*/) if err := rf.StartScan( context.Background(), - kv.NewTxn(ctx, kvDB, 0), roachpb.Spans{tableDesc.IndexSpan(keys.SystemSQLCodec, tableDesc.PublicNonPrimaryIndexes()[0].GetID())}, nil, /* spanIDs */ rowinfra.NoBytesLimit, @@ -683,14 +685,15 @@ func TestRowFetcherReset(t *testing.T) { tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, keys.SystemSQLCodec, sqlutils.TestDB, "foo") + var txn *kv.Txn args := initFetcherArgs{ tableDesc: tableDesc, indexIdx: 0, } da := tree.DatumAlloc{} - fetcher := initFetcher(t, args, false, &da, nil /*memMon*/) + fetcher := initFetcher(t, txn, args, false, &da, nil /*memMon*/) - resetFetcher := initFetcher(t, args, false /*reverseScan*/, &da, nil /*memMon*/) + resetFetcher := initFetcher(t, txn, args, false /*reverseScan*/, &da, nil /*memMon*/) resetFetcher.Reset() @@ -701,6 +704,7 @@ func TestRowFetcherReset(t *testing.T) { if err := resetFetcher.Init( ctx, FetcherInitArgs{ + Txn: txn, Alloc: &da, Spec: &spec, }, diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 0aa76dd34478..c05a79656c7c 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -237,11 +237,7 @@ func makeKVBatchFetcherDefaultSendFunc(txn *kv.Txn) sendFunc { type kvBatchFetcherArgs struct { sendFn sendFunc - spans roachpb.Spans - spanIDs []int reverse bool - batchBytesLimit rowinfra.BytesLimit - firstBatchKeyLimit rowinfra.KeyLimit lockStrength descpb.ScanLockingStrength lockWaitPolicy descpb.ScanLockingWaitPolicy lockTimeout time.Duration @@ -251,10 +247,26 @@ type kvBatchFetcherArgs struct { responseAdmissionQ *admission.WorkQueue } -// makeKVBatchFetcher initializes a KVBatchFetcher for the given spans. If -// useBatchLimit is true, the number of result keys per batch is limited; the -// limit grows between subsequent batches, starting at firstBatchKeyLimit (if not -// 0) to ProductionKVBatchSize. +// newKVBatchFetcher initializes a KVBatchFetcher. +// +// The passed-in memory account is owned by the fetcher throughout its lifetime +// but is **not** closed - it is the caller's responsibility to close acc if it +// is non-nil. +func newKVBatchFetcher(args kvBatchFetcherArgs) *txnKVFetcher { + return &txnKVFetcher{ + sendFn: args.sendFn, + reverse: args.reverse, + lockStrength: getKeyLockingStrength(args.lockStrength), + lockWaitPolicy: getWaitPolicy(args.lockWaitPolicy), + lockTimeout: args.lockTimeout, + acc: args.acc, + forceProductionKVBatchSize: args.forceProductionKVBatchSize, + requestAdmissionHeader: args.requestAdmissionHeader, + responseAdmissionQ: args.responseAdmissionQ, + } +} + +// SetupNextFetch sets up the Fetcher for the next set of spans. // // The fetcher takes ownership of the spans slice - it can modify the slice and // will perform the memory accounting accordingly (if acc is non-nil). The @@ -270,48 +282,52 @@ type kvBatchFetcherArgs struct { // If spanIDs is non-nil, then it must be of the same length as spans. // // Batch limits can only be used if the spans are ordered. -// -// The passed-in memory account is owned by the fetcher throughout its lifetime -// but is **not** closed - it is the caller's responsibility to close acc if it -// is non-nil. -func makeKVBatchFetcher(ctx context.Context, args kvBatchFetcherArgs) (txnKVFetcher, error) { - if args.firstBatchKeyLimit < 0 || (args.batchBytesLimit == 0 && args.firstBatchKeyLimit != 0) { - // Passing firstBatchKeyLimit without batchBytesLimit doesn't make sense - the - // only reason to not set batchBytesLimit is in order to get DistSender-level - // parallelism, and setting firstBatchKeyLimit inhibits that. - return txnKVFetcher{}, errors.Errorf("invalid batch limit %d (batchBytesLimit: %d)", - args.firstBatchKeyLimit, args.batchBytesLimit) +func (f *txnKVFetcher) SetupNextFetch( + ctx context.Context, + spans roachpb.Spans, + spanIDs []int, + batchBytesLimit rowinfra.BytesLimit, + firstBatchKeyLimit rowinfra.KeyLimit, +) error { + f.reset(ctx) + + if firstBatchKeyLimit < 0 || (batchBytesLimit == 0 && firstBatchKeyLimit != 0) { + // Passing firstBatchKeyLimit without batchBytesLimit doesn't make sense + // - the only reason to not set batchBytesLimit is in order to get + // DistSender-level parallelism, and setting firstBatchKeyLimit inhibits + // that. + return errors.Errorf("invalid batch limit %d (batchBytesLimit: %d)", firstBatchKeyLimit, batchBytesLimit) } - if args.batchBytesLimit != 0 { + if batchBytesLimit != 0 { // Verify the spans are ordered if a batch limit is used. - for i := 1; i < len(args.spans); i++ { - prevKey := args.spans[i-1].EndKey + for i := 1; i < len(spans); i++ { + prevKey := spans[i-1].EndKey if prevKey == nil { // This is the case of a GetRequest. - prevKey = args.spans[i-1].Key + prevKey = spans[i-1].Key } - if args.spans[i].Key.Compare(prevKey) < 0 { - return txnKVFetcher{}, errors.Errorf("unordered spans (%s %s)", args.spans[i-1], args.spans[i]) + if spans[i].Key.Compare(prevKey) < 0 { + return errors.Errorf("unordered spans (%s %s)", spans[i-1], spans[i]) } } } else if util.RaceEnabled { // Otherwise, just verify the spans don't contain consecutive overlapping // spans. - for i := 1; i < len(args.spans); i++ { - prevEndKey := args.spans[i-1].EndKey + for i := 1; i < len(spans); i++ { + prevEndKey := spans[i-1].EndKey if prevEndKey == nil { - prevEndKey = args.spans[i-1].Key + prevEndKey = spans[i-1].Key } - curEndKey := args.spans[i].EndKey + curEndKey := spans[i].EndKey if curEndKey == nil { - curEndKey = args.spans[i].Key + curEndKey = spans[i].Key } - if args.spans[i].Key.Compare(prevEndKey) >= 0 { + if spans[i].Key.Compare(prevEndKey) >= 0 { // Current span's start key is greater than or equal to the last span's // end key - we're good. continue - } else if curEndKey.Compare(args.spans[i-1].Key) <= 0 { + } else if curEndKey.Compare(spans[i-1].Key) <= 0 { // Current span's end key is less than or equal to the last span's start // key - also good. continue @@ -319,34 +335,23 @@ func makeKVBatchFetcher(ctx context.Context, args kvBatchFetcherArgs) (txnKVFetc // Otherwise, the two spans overlap, which isn't allowed - it leaves us at // risk of incorrect results, since the row fetcher can't distinguish // between identical rows in two different batches. - return txnKVFetcher{}, errors.Errorf("overlapping neighbor spans (%s %s)", args.spans[i-1], args.spans[i]) + return errors.Errorf("overlapping neighbor spans (%s %s)", spans[i-1], spans[i]) } } - f := txnKVFetcher{ - sendFn: args.sendFn, - reverse: args.reverse, - batchBytesLimit: args.batchBytesLimit, - firstBatchKeyLimit: args.firstBatchKeyLimit, - lockStrength: getKeyLockingStrength(args.lockStrength), - lockWaitPolicy: GetWaitPolicy(args.lockWaitPolicy), - lockTimeout: args.lockTimeout, - acc: args.acc, - forceProductionKVBatchSize: args.forceProductionKVBatchSize, - requestAdmissionHeader: args.requestAdmissionHeader, - responseAdmissionQ: args.responseAdmissionQ, - } + f.batchBytesLimit = batchBytesLimit + f.firstBatchKeyLimit = firstBatchKeyLimit // Account for the memory of the spans that we're taking the ownership of. if f.acc != nil { - f.spansAccountedFor = args.spans.MemUsage() + f.spansAccountedFor = spans.MemUsage() if err := f.acc.Grow(ctx, f.spansAccountedFor); err != nil { - return txnKVFetcher{}, err + return err } } - if args.spanIDs != nil && len(args.spans) != len(args.spanIDs) { - return txnKVFetcher{}, errors.AssertionFailedf("unexpectedly non-nil spanIDs slice has a different length than spans") + if spanIDs != nil && len(spans) != len(spanIDs) { + return errors.AssertionFailedf("unexpectedly non-nil spanIDs slice has a different length than spans") } // Since the fetcher takes ownership of the spans slice, we don't need to @@ -354,10 +359,10 @@ func makeKVBatchFetcher(ctx context.Context, args kvBatchFetcherArgs) (txnKVFetc // fetcher receives the resume spans), but the fetcher will always keep the // memory accounting up to date. f.spans = identifiableSpans{ - Spans: args.spans, - spanIDs: args.spanIDs, + Spans: spans, + spanIDs: spanIDs, } - if args.reverse { + if f.reverse { // Reverse scans receive the spans in decreasing order. Note that we // need to be this tricky since we're updating the spans in place. i, j := 0, f.spans.Len()-1 @@ -371,7 +376,7 @@ func makeKVBatchFetcher(ctx context.Context, args kvBatchFetcherArgs) (txnKVFetc // larger slices when processing the resume spans. f.scratchSpans = f.spans - return f, nil + return nil } // fetch retrieves spans from the kv layer. @@ -603,14 +608,21 @@ func (f *txnKVFetcher) nextBatch(ctx context.Context) (resp kvBatchFetcherRespon return f.nextBatch(ctx) } -// close releases the resources of this txnKVFetcher. -func (f *txnKVFetcher) close(ctx context.Context) { +func (f *txnKVFetcher) reset(ctx context.Context) { + f.alreadyFetched = false + f.batchIdx = 0 f.responses = nil f.remainingBatches = nil f.spans = identifiableSpans{} f.scratchSpans = identifiableSpans{} // Release only the allocations made by this fetcher. f.acc.Shrink(ctx, f.batchResponseAccountedFor+f.spansAccountedFor) + f.batchResponseAccountedFor, f.spansAccountedFor = 0, 0 +} + +// close releases the resources of this txnKVFetcher. +func (f *txnKVFetcher) close(ctx context.Context) { + f.reset(ctx) } // spansToRequests converts the provided spans to the corresponding requests. If diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index f83a21eddece..8c768735573a 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -14,11 +14,14 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" ) // CanUseStreamer returns whether the kvstreamer.Streamer API should be used. @@ -37,11 +40,13 @@ var useStreamerEnabled = settings.RegisterBoolSetting( true, ) -// TxnKVStreamer handles retrieval of key/values. -type TxnKVStreamer struct { - streamer *kvstreamer.Streamer - spans roachpb.Spans - spanIDs []int +// txnKVStreamer handles retrieval of key/values. +type txnKVStreamer struct { + streamer *kvstreamer.Streamer + keyLocking lock.Strength + + spans roachpb.Spans + spanIDs []int // getResponseScratch is reused to return the result of Get requests. getResponseScratch [1]roachpb.KeyValue @@ -54,32 +59,43 @@ type TxnKVStreamer struct { } } -var _ KVBatchFetcher = &TxnKVStreamer{} +var _ KVBatchFetcher = &txnKVStreamer{} + +// newTxnKVStreamer creates a new txnKVStreamer. +func newTxnKVStreamer( + streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength, +) KVBatchFetcher { + return &txnKVStreamer{ + streamer: streamer, + keyLocking: getKeyLockingStrength(lockStrength), + } +} -// NewTxnKVStreamer creates a new TxnKVStreamer. -func NewTxnKVStreamer( +// SetupNextFetch implements the KVBatchFetcher interface. +func (f *txnKVStreamer) SetupNextFetch( ctx context.Context, - streamer *kvstreamer.Streamer, spans roachpb.Spans, spanIDs []int, - lockStrength descpb.ScanLockingStrength, -) (*TxnKVStreamer, error) { + bytesLimit rowinfra.BytesLimit, + _ rowinfra.KeyLimit, +) error { + if bytesLimit != rowinfra.NoBytesLimit { + return errors.AssertionFailedf("unexpectedly non-zero bytes limit for txnKVStreamer") + } + f.reset(ctx) if log.ExpensiveLogEnabled(ctx, 2) { log.VEventf(ctx, 2, "Scan %s", spans) } - keyLocking := getKeyLockingStrength(lockStrength) - reqs := spansToRequests(spans, false /* reverse */, keyLocking) - if err := streamer.Enqueue(ctx, reqs); err != nil { - return nil, err + reqs := spansToRequests(spans, false /* reverse */, f.keyLocking) + if err := f.streamer.Enqueue(ctx, reqs); err != nil { + return err } - return &TxnKVStreamer{ - streamer: streamer, - spans: spans, - spanIDs: spanIDs, - }, nil + f.spans = spans + f.spanIDs = spanIDs + return nil } -func (f *TxnKVStreamer) getSpanID(resultPosition int) int { +func (f *txnKVStreamer) getSpanID(resultPosition int) int { if f.spanIDs == nil { return resultPosition } @@ -89,7 +105,7 @@ func (f *TxnKVStreamer) getSpanID(resultPosition int) int { // proceedWithLastResult processes the result which must be already set on the // lastResultState and emits the first part of the response (the only part for // GetResponses). -func (f *TxnKVStreamer) proceedWithLastResult( +func (f *txnKVStreamer) proceedWithLastResult( ctx context.Context, ) (skip bool, _ kvBatchFetcherResponse, _ error) { result := f.lastResultState.Result @@ -122,13 +138,13 @@ func (f *TxnKVStreamer) proceedWithLastResult( return false, ret, nil } -func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) { +func (f *txnKVStreamer) releaseLastResult(ctx context.Context) { f.lastResultState.Release(ctx) f.lastResultState.Result = kvstreamer.Result{} } // nextBatch implements the KVBatchFetcher interface. -func (f *TxnKVStreamer) nextBatch(ctx context.Context) (kvBatchFetcherResponse, error) { +func (f *txnKVStreamer) nextBatch(ctx context.Context) (kvBatchFetcherResponse, error) { // Check whether there are more batches in the current ScanResponse. if len(f.lastResultState.remainingBatches) > 0 { ret := kvBatchFetcherResponse{ @@ -171,11 +187,17 @@ func (f *TxnKVStreamer) nextBatch(ctx context.Context) (kvBatchFetcherResponse, return f.nextBatch(ctx) } -// close releases the resources of this TxnKVStreamer. -func (f *TxnKVStreamer) close(ctx context.Context) { +// reset releases all of the results from the last fetch. +func (f *txnKVStreamer) reset(ctx context.Context) { f.lastResultState.Release(ctx) for _, r := range f.results { r.Release(ctx) } - *f = TxnKVStreamer{} +} + +// close releases the resources of this txnKVStreamer. +func (f *txnKVStreamer) close(ctx context.Context) { + f.reset(ctx) + f.streamer.Close(ctx) + *f = txnKVStreamer{} } diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 0a92694809ec..8065d97c25a3 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -16,13 +16,17 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" ) @@ -59,20 +63,15 @@ type KVFetcher struct { // // If spanIDs is non-nil, then it must be of the same length as spans. func NewKVFetcher( - ctx context.Context, txn *kv.Txn, - spans roachpb.Spans, - spanIDs []int, bsHeader *roachpb.BoundedStalenessHeader, reverse bool, - batchBytesLimit rowinfra.BytesLimit, - firstBatchLimit rowinfra.KeyLimit, lockStrength descpb.ScanLockingStrength, lockWaitPolicy descpb.ScanLockingWaitPolicy, lockTimeout time.Duration, acc *mon.BoundAccount, forceProductionKVBatchSize bool, -) (*KVFetcher, error) { +) *KVFetcher { var sendFn sendFunc // Avoid the heap allocation by allocating sendFn specifically in the if. if bsHeader == nil { @@ -99,33 +98,66 @@ func NewKVFetcher( } } - kvBatchFetcher, err := makeKVBatchFetcher( - ctx, - kvBatchFetcherArgs{ - sendFn: sendFn, - spans: spans, - spanIDs: spanIDs, - reverse: reverse, - batchBytesLimit: batchBytesLimit, - firstBatchKeyLimit: firstBatchLimit, - lockStrength: lockStrength, - lockWaitPolicy: lockWaitPolicy, - lockTimeout: lockTimeout, - acc: acc, - forceProductionKVBatchSize: forceProductionKVBatchSize, - requestAdmissionHeader: txn.AdmissionHeader(), - responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, - }, - ) - return newKVFetcher(&kvBatchFetcher), err + fetcherArgs := kvBatchFetcherArgs{ + sendFn: sendFn, + reverse: reverse, + lockStrength: lockStrength, + lockWaitPolicy: lockWaitPolicy, + lockTimeout: lockTimeout, + acc: acc, + forceProductionKVBatchSize: forceProductionKVBatchSize, + } + if txn != nil { + // In most cases, the txn is non-nil; however, in some code paths (e.g. + // when executing EXPLAIN (VEC)) it might be nil, so we need to have + // this check. + fetcherArgs.requestAdmissionHeader = txn.AdmissionHeader() + fetcherArgs.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ + } + return newKVFetcher(newKVBatchFetcher(fetcherArgs)) } -// NewKVStreamingFetcher returns a new KVFetcher that utilizes the provided -// TxnKVStreamer to perform KV reads. -func NewKVStreamingFetcher(streamer *TxnKVStreamer) *KVFetcher { - return &KVFetcher{ - KVBatchFetcher: streamer, +// NewStreamingKVFetcher returns a new KVFetcher that utilizes the provided +// kvstreamer.Streamer to perform KV reads. +// +// If maintainOrdering is true, then diskBuffer must be non-nil. +func NewStreamingKVFetcher( + distSender *kvcoord.DistSender, + stopper *stop.Stopper, + txn *kv.Txn, + st *cluster.Settings, + lockWaitPolicy descpb.ScanLockingWaitPolicy, + lockStrength descpb.ScanLockingStrength, + streamerBudgetLimit int64, + streamerBudgetAcc *mon.BoundAccount, + maintainOrdering bool, + singleRowLookup bool, + maxKeysPerRow int, + diskBuffer kvstreamer.ResultDiskBuffer, +) *KVFetcher { + streamer := kvstreamer.NewStreamer( + distSender, + stopper, + txn, + st, + getWaitPolicy(lockWaitPolicy), + streamerBudgetLimit, + streamerBudgetAcc, + ) + mode := kvstreamer.OutOfOrder + if maintainOrdering { + mode = kvstreamer.InOrder } + streamer.Init( + mode, + kvstreamer.Hints{ + UniqueRequests: true, + SingleRowLookup: singleRowLookup, + }, + maxKeysPerRow, + diskBuffer, + ) + return newKVFetcher(newTxnKVStreamer(streamer, lockStrength)) } func newKVFetcher(batchFetcher KVBatchFetcher) *KVFetcher { @@ -236,6 +268,23 @@ func (f *KVFetcher) NextKV( } } +// SetupNextFetch overrides the same method from the wrapped KVBatchFetcher in +// order reset this KVFetcher. +func (f *KVFetcher) SetupNextFetch( + ctx context.Context, + spans roachpb.Spans, + spanIDs []int, + batchBytesLimit rowinfra.BytesLimit, + firstBatchKeyLimit rowinfra.KeyLimit, +) error { + f.kvs = nil + f.batchResponse = nil + f.spanID = 0 + return f.KVBatchFetcher.SetupNextFetch( + ctx, spans, spanIDs, batchBytesLimit, firstBatchKeyLimit, + ) +} + // Close releases the resources held by this KVFetcher. It must be called // at the end of execution if the fetcher was provisioned with a memory // monitor. @@ -263,6 +312,13 @@ func (f *SpanKVFetcher) nextBatch(ctx context.Context) (kvBatchFetcherResponse, }, nil } +// SetupNextFetch implements the KVBatchFetcher interface. +func (f *SpanKVFetcher) SetupNextFetch( + context.Context, roachpb.Spans, []int, rowinfra.BytesLimit, rowinfra.KeyLimit, +) error { + return nil +} + func (f *SpanKVFetcher) close(context.Context) {} // BackupSSTKVFetcher is a KVBatchFetcher that wraps storage.SimpleMVCCIterator @@ -366,6 +422,13 @@ func (f *BackupSSTKVFetcher) nextBatch(ctx context.Context) (kvBatchFetcherRespo }, nil } +// SetupNextFetch implements the KVBatchFetcher interface. +func (f *BackupSSTKVFetcher) SetupNextFetch( + context.Context, roachpb.Spans, []int, rowinfra.BytesLimit, rowinfra.KeyLimit, +) error { + return nil +} + func (f *BackupSSTKVFetcher) close(context.Context) { f.iter.Close() } diff --git a/pkg/sql/row/locking.go b/pkg/sql/row/locking.go index 02d155d46a6a..52d51f74815f 100644 --- a/pkg/sql/row/locking.go +++ b/pkg/sql/row/locking.go @@ -44,9 +44,9 @@ func getKeyLockingStrength(lockStrength descpb.ScanLockingStrength) lock.Strengt } } -// GetWaitPolicy returns the configured lock wait policy to use for key-value +// getWaitPolicy returns the configured lock wait policy to use for key-value // scans. -func GetWaitPolicy(lockWaitPolicy descpb.ScanLockingWaitPolicy) lock.WaitPolicy { +func getWaitPolicy(lockWaitPolicy descpb.ScanLockingWaitPolicy) lock.WaitPolicy { switch lockWaitPolicy { case descpb.ScanLockingWaitPolicy_BLOCK: return lock.WaitPolicy_Block diff --git a/pkg/sql/rowexec/BUILD.bazel b/pkg/sql/rowexec/BUILD.bazel index 805876bc7a99..3cde258ed6e5 100644 --- a/pkg/sql/rowexec/BUILD.bazel +++ b/pkg/sql/rowexec/BUILD.bazel @@ -46,7 +46,6 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvstreamer", - "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/server/telemetry", diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index 35b15957d6fb..d8f92e0f56ff 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -432,8 +432,9 @@ func (ib *indexBackfiller) buildIndexEntryBatch( // TODO(knz): do KV tracing in DistSQL processors. var err error - entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(ctx, txn, ib.desc, sp, - ib.spec.ChunkSize, false /*traceKV*/) + entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk( + ctx, txn, ib.desc, sp, ib.spec.ChunkSize, false, /* traceKV */ + ) return err }); err != nil { return nil, nil, 0, err diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index bb170124c5f4..c3f738065d29 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -291,6 +291,7 @@ func newInvertedJoiner( if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ + Txn: flowCtx.Txn, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, @@ -496,7 +497,7 @@ func (ij *invertedJoiner) readInput() (invertedJoinerState, *execinfrapb.Produce log.VEventf(ij.Ctx, 1, "scanning %d spans", len(ij.indexSpans)) if err = ij.fetcher.StartScan( - ij.Ctx, ij.FlowCtx.Txn, ij.indexSpans, nil, /* spanIDs */ + ij.Ctx, ij.indexSpans, nil, /* spanIDs */ rowinfra.NoBytesLimit, rowinfra.NoRowLimit, ); err != nil { ij.MoveToDraining(err) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 9a56e8680c78..65c56c2d1b6e 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -120,9 +119,6 @@ type joinReader struct { shouldLimitBatches bool readerType joinReaderType - keyLocking descpb.ScanLockingStrength - lockWaitPolicy lock.WaitPolicy - // txn is the transaction used by the join reader. txn *kv.Txn @@ -130,14 +126,9 @@ type joinReader struct { // the kvstreamer.Streamer API. usesStreamer bool streamerInfo struct { - *kvstreamer.Streamer unlimitedMemMonitor *mon.BytesMonitor budgetAcc mon.BoundAccount - budgetLimit int64 - singleRowLookup bool - maxKeysPerRow int diskMonitor *mon.BytesMonitor - diskBuffer kvstreamer.ResultDiskBuffer } input execinfra.RowSource @@ -322,8 +313,6 @@ func newJoinReader( outputGroupContinuationForLeftRow: spec.OutputGroupContinuationForLeftRow, shouldLimitBatches: shouldLimitBatches, readerType: readerType, - keyLocking: spec.LockingStrength, - lockWaitPolicy: row.GetWaitPolicy(spec.LockingWaitPolicy), usesStreamer: useStreamer, lookupBatchBytesLimit: rowinfra.BytesLimit(spec.LookupBatchBytesLimit), limitHintHelper: execinfra.MakeLimitHintHelper(spec.LimitHint, post), @@ -396,31 +385,6 @@ func newJoinReader( return nil, err } - var fetcher row.Fetcher - if err := fetcher.Init( - flowCtx.EvalCtx.Context, - row.FetcherInitArgs{ - LockStrength: spec.LockingStrength, - LockWaitPolicy: spec.LockingWaitPolicy, - LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, - Alloc: &jr.alloc, - MemMonitor: flowCtx.EvalCtx.Mon, - Spec: &spec.FetchSpec, - TraceKV: flowCtx.TraceKV, - ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, - }, - ); err != nil { - return nil, err - } - - if execstats.ShouldCollectStats(flowCtx.EvalCtx.Ctx(), flowCtx.CollectStats) { - jr.input = newInputStatCollector(jr.input) - jr.fetcher = newRowFetcherStatCollector(&fetcher) - jr.ExecStatsForTrace = jr.execStatsForTrace - } else { - jr.fetcher = &fetcher - } - if !spec.LookupExpr.Empty() { lookupExprTypes := make([]*types.T, 0, len(leftTypes)+len(rightTypes)) lookupExprTypes = append(lookupExprTypes, leftTypes...) @@ -466,6 +430,7 @@ func newJoinReader( } jr.batchSizeBytes = jr.strategy.getLookupRowsBatchSizeHint(flowCtx.EvalCtx.SessionData()) + var streamingKVFetcher *row.KVFetcher if jr.usesStreamer { // NOTE: this comment should only be considered in a case of low workmem // limit (which is a testing scenario). @@ -499,7 +464,7 @@ func newJoinReader( // whereas the batch size hint is at most 4MiB, so the streamer will get // at least (depending on the hint) on the order of 52MiB which is // plenty enough. - jr.streamerInfo.budgetLimit = memoryLimit - 3*jr.batchSizeBytes + streamerBudgetLimit := memoryLimit - 3*jr.batchSizeBytes // We need to use an unlimited monitor for the streamer's budget since // the streamer itself is responsible for staying under the limit. jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( @@ -507,8 +472,31 @@ func newJoinReader( ) jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() - jr.streamerInfo.singleRowLookup = readerType == indexJoinReaderType || spec.LookupColumnsAreKey - jr.streamerInfo.maxKeysPerRow = int(jr.fetchSpec.MaxKeysPerRow) + + var diskBuffer kvstreamer.ResultDiskBuffer + if jr.maintainOrdering { + jr.streamerInfo.diskMonitor = execinfra.NewMonitor( + jr.Ctx, jr.FlowCtx.DiskMonitor, "streamer-disk", /* name */ + ) + diskBuffer = rowcontainer.NewKVStreamerResultDiskBuffer( + jr.FlowCtx.Cfg.TempStorage, jr.streamerInfo.diskMonitor, + ) + } + singleRowLookup := readerType == indexJoinReaderType || spec.LookupColumnsAreKey + streamingKVFetcher = row.NewStreamingKVFetcher( + flowCtx.Cfg.DistSender, + flowCtx.Stopper(), + jr.txn, + flowCtx.EvalCtx.Settings, + spec.LockingWaitPolicy, + spec.LockingStrength, + streamerBudgetLimit, + &jr.streamerInfo.budgetAcc, + spec.MaintainOrdering, + singleRowLookup, + int(spec.FetchSpec.MaxKeysPerRow), + diskBuffer, + ) } else { // When not using the Streamer API, we want to limit the batch size hint // to at most half of the workmem limit. Note that it is ok if it is set @@ -519,6 +507,33 @@ func newJoinReader( } } + var fetcher row.Fetcher + if err := fetcher.Init( + flowCtx.EvalCtx.Context, + row.FetcherInitArgs{ + StreamingKVFetcher: streamingKVFetcher, + Txn: jr.txn, + LockStrength: spec.LockingStrength, + LockWaitPolicy: spec.LockingWaitPolicy, + LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, + Alloc: &jr.alloc, + MemMonitor: flowCtx.EvalCtx.Mon, + Spec: &spec.FetchSpec, + TraceKV: flowCtx.TraceKV, + ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + }, + ); err != nil { + return nil, err + } + + if execstats.ShouldCollectStats(flowCtx.EvalCtx.Ctx(), flowCtx.CollectStats) { + jr.input = newInputStatCollector(jr.input) + jr.fetcher = newRowFetcherStatCollector(&fetcher) + jr.ExecStatsForTrace = jr.execStatsForTrace + } else { + jr.fetcher = &fetcher + } + // TODO(radu): verify the input types match the index key types return jr, nil } @@ -941,18 +956,8 @@ func (jr *joinReader) readInput() ( // modification here, but we want to be conscious about the memory // accounting - we don't double count for any memory of spans because the // joinReaderStrategy doesn't account for any memory used by the spans. - if jr.usesStreamer { - var kvBatchFetcher *row.TxnKVStreamer - kvBatchFetcher, err = row.NewTxnKVStreamer( - jr.Ctx, jr.streamerInfo.Streamer, spans, spanIDs, jr.keyLocking, - ) - if err != nil { - jr.MoveToDraining(err) - return jrStateUnknown, nil, jr.DrainHelper() - } - err = jr.fetcher.StartScanFrom(jr.Ctx, kvBatchFetcher) - } else { - var bytesLimit rowinfra.BytesLimit + var bytesLimit rowinfra.BytesLimit + if !jr.usesStreamer { if !jr.shouldLimitBatches { bytesLimit = rowinfra.NoBytesLimit } else { @@ -961,11 +966,10 @@ func (jr *joinReader) readInput() ( bytesLimit = rowinfra.GetDefaultBatchBytesLimit(jr.EvalCtx.TestingKnobs.ForceProductionValues) } } - err = jr.fetcher.StartScan( - jr.Ctx, jr.txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, - ) } - if err != nil { + if err = jr.fetcher.StartScan( + jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, nil, jr.DrainHelper() } @@ -1028,7 +1032,7 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet bytesLimit = rowinfra.NoBytesLimit } if err := jr.fetcher.StartScan( - jr.Ctx, jr.txn, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, + jr.Ctx, spans, spanIDs, bytesLimit, rowinfra.NoRowLimit, ); err != nil { jr.MoveToDraining(err) return jrStateUnknown, jr.DrainHelper() @@ -1070,36 +1074,6 @@ func (jr *joinReader) performMemoryAccounting() error { func (jr *joinReader) Start(ctx context.Context) { ctx = jr.StartInternal(ctx, joinReaderProcName) jr.input.Start(ctx) - if jr.usesStreamer { - jr.streamerInfo.Streamer = kvstreamer.NewStreamer( - jr.FlowCtx.Cfg.DistSender, - jr.FlowCtx.Stopper(), - jr.txn, - jr.FlowCtx.EvalCtx.Settings, - jr.lockWaitPolicy, - jr.streamerInfo.budgetLimit, - &jr.streamerInfo.budgetAcc, - ) - mode := kvstreamer.OutOfOrder - if jr.maintainOrdering { - mode = kvstreamer.InOrder - jr.streamerInfo.diskMonitor = execinfra.NewMonitor( - ctx, jr.FlowCtx.DiskMonitor, "streamer-disk", /* name */ - ) - jr.streamerInfo.diskBuffer = rowcontainer.NewKVStreamerResultDiskBuffer( - jr.FlowCtx.Cfg.TempStorage, jr.streamerInfo.diskMonitor, - ) - } - jr.streamerInfo.Streamer.Init( - mode, - kvstreamer.Hints{ - UniqueRequests: true, - SingleRowLookup: jr.streamerInfo.singleRowLookup, - }, - jr.streamerInfo.maxKeysPerRow, - jr.streamerInfo.diskBuffer, - ) - } jr.runningState = jrReadingInput } @@ -1115,12 +1089,6 @@ func (jr *joinReader) close() { jr.fetcher.Close(jr.Ctx) } if jr.usesStreamer { - // We have to cleanup the streamer after closing the fetcher because - // the latter might release some memory tracked by the budget of the - // streamer. - if jr.streamerInfo.Streamer != nil { - jr.streamerInfo.Streamer.Close(jr.Ctx) - } jr.streamerInfo.budgetAcc.Close(jr.Ctx) jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) if jr.streamerInfo.diskMonitor != nil { diff --git a/pkg/sql/rowexec/rowfetcher.go b/pkg/sql/rowexec/rowfetcher.go index 16db8669c924..de123a4c6ec7 100644 --- a/pkg/sql/rowexec/rowfetcher.go +++ b/pkg/sql/rowexec/rowfetcher.go @@ -28,7 +28,7 @@ import ( // collector wrapper can be plugged in. type rowFetcher interface { StartScan( - _ context.Context, _ *kv.Txn, _ roachpb.Spans, spanIDs []int, + _ context.Context, _ roachpb.Spans, spanIDs []int, batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, ) error StartScanFrom(_ context.Context, _ row.KVBatchFetcher) error diff --git a/pkg/sql/rowexec/stats.go b/pkg/sql/rowexec/stats.go index 48ee23a378dc..1c7f361873d7 100644 --- a/pkg/sql/rowexec/stats.go +++ b/pkg/sql/rowexec/stats.go @@ -94,14 +94,13 @@ func newRowFetcherStatCollector(f *row.Fetcher) *rowFetcherStatCollector { // StartScan is part of the rowFetcher interface. func (c *rowFetcherStatCollector) StartScan( ctx context.Context, - txn *kv.Txn, spans roachpb.Spans, spanIDs []int, batchBytesLimit rowinfra.BytesLimit, limitHint rowinfra.RowLimit, ) error { start := timeutil.Now() - err := c.fetcher.StartScan(ctx, txn, spans, spanIDs, batchBytesLimit, limitHint) + err := c.fetcher.StartScan(ctx, spans, spanIDs, batchBytesLimit, limitHint) c.startScanStallTime += timeutil.Since(start) return err } @@ -127,8 +126,7 @@ func (c *rowFetcherStatCollector) StartInconsistentScan( ) error { start := timeutil.Now() err := c.fetcher.StartInconsistentScan( - ctx, db, initialTimestamp, maxTimestampAge, spans, batchBytesLimit, - limitHint, qualityOfService, + ctx, db, initialTimestamp, maxTimestampAge, spans, batchBytesLimit, limitHint, qualityOfService, ) c.startScanStallTime += timeutil.Since(start) return err diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index d91380902ee3..0d0dcd253879 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -148,6 +148,7 @@ func newTableReader( if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ + Txn: flowCtx.Txn, Reverse: spec.Reverse, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, @@ -211,7 +212,7 @@ func (tr *tableReader) startScan(ctx context.Context) error { var err error if tr.maxTimestampAge == 0 { err = tr.fetcher.StartScan( - ctx, tr.FlowCtx.Txn, tr.Spans, nil /* spanIDs */, bytesLimit, tr.limitHint, + ctx, tr.Spans, nil /* spanIDs */, bytesLimit, tr.limitHint, ) } else { initialTS := tr.FlowCtx.Txn.ReadTimestamp() diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index df2d86cae951..1dc396461a6f 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -13,7 +13,6 @@ package rowexec import ( "context" - "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -451,12 +450,11 @@ func (z *zigzagJoiner) setupInfo( info.spanBuilder.InitWithFetchSpec(flowCtx.EvalCtx, flowCtx.Codec(), &info.fetchSpec) // Setup the Fetcher. - // NB: zigzag joins are disabled when a row-level locking clause is - // supplied, so there is no locking strength on ZigzagJoinerSpec. var fetcher row.Fetcher if err := fetcher.Init( flowCtx.EvalCtx.Context, row.FetcherInitArgs{ + Txn: flowCtx.Txn, LockStrength: spec.LockingStrength, LockWaitPolicy: spec.LockingWaitPolicy, LockTimeout: flowCtx.EvalCtx.SessionData().LockTimeout, @@ -614,7 +612,7 @@ func (z *zigzagJoiner) emitFromContainers() (rowenc.EncDatumRow, error) { // nextRow fetches the nextRow to emit from the join. It iterates through all // sides until a match is found then emits the results of the match one result // at a time. -func (z *zigzagJoiner) nextRow(ctx context.Context, txn *kv.Txn) (rowenc.EncDatumRow, error) { +func (z *zigzagJoiner) nextRow(ctx context.Context) (rowenc.EncDatumRow, error) { for { if err := z.cancelChecker.Check(); err != nil { return nil, err @@ -647,7 +645,6 @@ func (z *zigzagJoiner) nextRow(ctx context.Context, txn *kv.Txn) (rowenc.EncDatu err = curInfo.fetcher.StartScan( ctx, - txn, roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), @@ -788,7 +785,6 @@ func (z *zigzagJoiner) maybeFetchInitialRow() error { curInfo := &z.infos[z.side] err := curInfo.fetcher.StartScan( z.Ctx, - z.FlowCtx.Txn, roachpb.Spans{roachpb.Span{Key: curInfo.key, EndKey: curInfo.endKey}}, nil, /* spanIDs */ rowinfra.GetDefaultBatchBytesLimit(z.EvalCtx.TestingKnobs.ForceProductionValues), @@ -816,7 +812,7 @@ func (z *zigzagJoiner) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata z.MoveToDraining(err) break } - row, err := z.nextRow(z.Ctx, z.FlowCtx.Txn) + row, err := z.nextRow(z.Ctx) if err != nil { z.MoveToDraining(err) break