Skip to content

Commit

Permalink
sql: clean up the lifecycle of fetchers
Browse files Browse the repository at this point in the history
Previously, the lifecycle of different fetcher objects was a mess.
Consider the sequence of fetchers when used by the join reader with the
old non-streamer code path:
`rowexec.joinReader` -> `row.Fetcher` -> `row.KVFetcher` -> `row.txnKVFetcher`.
`row.Fetcher` was initialized once, but then on every call to
`StartScan`, we would create a new `row.txnKVFetcher` and then wrap it
with a new `row.KVFetcher` (during an internal `StartScanFrom` call). In
other words, throughout the lifetime of the join reader, its fetcher
would create a new pair of objects for each input row batch. This setup
is very unintuitive and previously led to some bugs with memory
accounting.

I believe such a setup was created organically, without giving too much
thought to it. Some considerations should be pointed out:
- in some cases, we have some state from the previous fetch that we want
to discard
- in some cases, we provide the `row.Fetcher` with a custom
`KVBatchFetcher` implementation.

This commit refactors all of this stuff to make it much more sane. In
particular, we now only create a single `row.KVFetcher` object that is
powered by a single `row.txnKVFetcher` or `row.txnKVStreamer`
implementation throughout the whole lifetime of `row.Fetcher`. In the
main code path, the callers are now expected to only use `StartScan`
method which correctly discards unnecessary state from the previous
call. This is achieved by adding a new method to `KVBatchFetcher`
interface.

This commit supports the use case with custom `KVBatchFetcher`s too by
asking the caller to explicitly specify a knob during the initialization
of the `row.Fetcher` - in such case, only `StartScanFrom` calls are
allowed. There, we still close the `KVBatchFetcher` from the previous
call (tbh I believe this is not necessary since these custom
`KVBatchFetcher`s don't have anything to clean up, but it's probably
safer to keep the old behavior here).

Furthermore, this commit pushes some arguments from `StartScan` into
`Init` - most notably the txn is now passed only once. However, there
are some use cases (like a column backfill, done in chunks) where the
txn might change throughout the lifetime of the fetcher - we allow
updating it later if needed.

This also allows us to unify the streamer and the non-streamer
code paths - to remove some of the duplicated code as well as push the
usage of the streamer lower in the stack.

Release note: None
  • Loading branch information
yuzefovich committed Jun 23, 2022
1 parent fecfaed commit e7e724e
Show file tree
Hide file tree
Showing 29 changed files with 595 additions and 503 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ ALL_TESTS = [
"//pkg/sql/protoreflect:protoreflect_test",
"//pkg/sql/querycache:querycache_test",
"//pkg/sql/randgen:randgen_test",
"//pkg/sql/row:row_disallowed_imports_test",
"//pkg/sql/row:row_test",
"//pkg/sql/rowcontainer:rowcontainer_test",
"//pkg/sql/rowenc/keyside:keyside_test",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily(
if err := rf.Init(
context.TODO(),
row.FetcherInitArgs{
Alloc: &c.a,
Spec: &spec,
WillUseCustomKVBatchFetcher: true,
Alloc: &c.a,
Spec: &spec,
},
); err != nil {
return nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/cliccl/debug_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,9 @@ func makeRowFetcher(
if err := rf.Init(
ctx,
row.FetcherInitArgs{
Alloc: &tree.DatumAlloc{},
Spec: &spec,
WillUseCustomKVBatchFetcher: true,
Alloc: &tree.DatumAlloc{},
Spec: &spec,
},
); err != nil {
return rf, err
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2586,7 +2586,7 @@ func columnBackfillInTxn(
rowMetrics := execCfg.GetRowMetrics(evalCtx.SessionData().Internal)
var backfiller backfill.ColumnBackfiller
if err := backfiller.InitForLocalUse(
ctx, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, traceKV,
ctx, txn, evalCtx, semaCtx, tableDesc, columnBackfillerMon, rowMetrics, traceKV,
); err != nil {
return err
}
Expand Down Expand Up @@ -2635,8 +2635,10 @@ func indexBackfillInTxn(
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
for sp.Key != nil {
var err error
sp.Key, err = backfiller.RunIndexBackfillChunk(ctx,
txn, tableDesc, sp, indexTxnBackfillChunkSize, false /* alsoCommit */, traceKV)
sp.Key, err = backfiller.RunIndexBackfillChunk(
ctx, txn, tableDesc, sp, indexTxnBackfillChunkSize,
false /* alsoCommit */, traceKV,
)
if err != nil {
return err
}
Expand Down
26 changes: 20 additions & 6 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ func (cb *ColumnBackfiller) initCols(desc catalog.TableDescriptor) {

// init performs initialization operations that are shared across the local
// and distributed initialization procedures for the ColumnBackfiller.
//
// txn might be nil, in which case it will need to be set on the fetcher later.
func (cb *ColumnBackfiller) init(
txn *kv.Txn,
evalCtx *eval.Context,
defaultExprs []tree.TypedExpr,
computedExprs []tree.TypedExpr,
Expand Down Expand Up @@ -154,6 +157,7 @@ func (cb *ColumnBackfiller) init(
return cb.fetcher.Init(
evalCtx.Context,
row.FetcherInitArgs{
Txn: txn,
Alloc: &cb.alloc,
MemMonitor: cb.mon,
Spec: &spec,
Expand All @@ -167,6 +171,7 @@ func (cb *ColumnBackfiller) init(
// is occurring on the gateway as part of the user's transaction.
func (cb *ColumnBackfiller) InitForLocalUse(
ctx context.Context,
txn *kv.Txn,
evalCtx *eval.Context,
semaCtx *tree.SemaContext,
desc catalog.TableDescriptor,
Expand All @@ -193,7 +198,7 @@ func (cb *ColumnBackfiller) InitForLocalUse(
if err != nil {
return err
}
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, traceKV)
return cb.init(txn, evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, traceKV)
}

// InitForDistributedUse initializes a ColumnBackfiller for use as part of a
Expand Down Expand Up @@ -250,7 +255,8 @@ func (cb *ColumnBackfiller) InitForDistributedUse(
flowCtx.Descriptors.ReleaseAll(ctx)

rowMetrics := flowCtx.GetRowMetrics()
return cb.init(evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, flowCtx.TraceKV)
// The txn will be set on the fetcher in RunColumnBackfillChunk.
return cb.init(nil /* txn */, evalCtx, defaultExprs, computedExprs, desc, mon, rowMetrics, flowCtx.TraceKV)
}

// Close frees the resources used by the ColumnBackfiller.
Expand Down Expand Up @@ -302,6 +308,12 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
panic("only column data should be modified, but the rowUpdater is configured otherwise")
}

// Update the fetcher to use the new txn.
if err := cb.fetcher.SetTxn(txn); err != nil {
log.Errorf(ctx, "scan error during SetTxn: %s", err)
return roachpb.Key{}, err
}

// Get the next set of rows.
//
// Running the scan and applying the changes in many transactions
Expand All @@ -311,7 +323,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
// populated and deleted by the OLTP commands but not otherwise
// read or used
if err := cb.fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */
ctx, []roachpb.Span{sp}, nil, /* spanIDs */
rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */),
chunkSize,
); err != nil {
Expand Down Expand Up @@ -804,6 +816,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
if err := fetcher.Init(
ib.evalCtx.Context,
row.FetcherInitArgs{
Txn: txn,
Alloc: &ib.alloc,
MemMonitor: ib.mon,
Spec: &spec,
Expand All @@ -814,7 +827,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
}
defer fetcher.Close(ctx)
if err := fetcher.StartScan(
ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */
ctx, []roachpb.Span{sp}, nil, /* spanIDs */
rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */),
initBufferSize,
); err != nil {
Expand Down Expand Up @@ -980,8 +993,9 @@ func (ib *IndexBackfiller) RunIndexBackfillChunk(
alsoCommit bool,
traceKV bool,
) (roachpb.Key, error) {
entries, key, memUsedBuildingChunk, err := ib.BuildIndexEntriesChunk(ctx, txn, tableDesc, sp,
chunkSize, traceKV)
entries, key, memUsedBuildingChunk, err := ib.BuildIndexEntriesChunk(
ctx, txn, tableDesc, sp, chunkSize, traceKV,
)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"//pkg/col/typeconv",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvstreamer",
"//pkg/roachpb",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
Expand Down
118 changes: 28 additions & 90 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ import (
"sort"
"strings"
"sync"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
Expand All @@ -44,7 +41,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)
Expand Down Expand Up @@ -165,28 +161,18 @@ func (m colIdxMap) Swap(i, j int) {
}

type cFetcherArgs struct {
// lockStrength represents the row-level locking mode to use when fetching
// rows.
lockStrength descpb.ScanLockingStrength
// lockWaitPolicy represents the policy to be used for handling conflicting
// locks held by other active transactions.
lockWaitPolicy descpb.ScanLockingWaitPolicy
// lockTimeout specifies the maximum amount of time that the fetcher will
// wait while attempting to acquire a lock on a key or while blocking on an
// existing lock in order to perform a non-locking read on a key.
lockTimeout time.Duration
// memoryLimit determines the maximum memory footprint of the output batch.
memoryLimit int64
// estimatedRowCount is the optimizer-derived number of expected rows that
// this fetch will produce, if non-zero.
estimatedRowCount uint64
// reverse denotes whether or not the spans should be read in reverse or not
// when StartScan is invoked.
reverse bool
// traceKV indicates whether or not session tracing is enabled. It is set
// when initializing the fetcher.
traceKV bool
forceProductionKVBatchSize bool
traceKV bool
// singleUse, if true, indicates that the cFetcher will only need to scan a
// single set of spans. This allows the cFetcher to close itself eagerly,
// once it finishes the first fetch.
singleUse bool
}

// noOutputColumn is a sentinel value to denote that a system column is not
Expand Down Expand Up @@ -230,7 +216,7 @@ type cFetcher struct {
fetcher *row.KVFetcher
// bytesRead stores the cumulative number of bytes read by this cFetcher
// throughout its whole existence (i.e. between its construction and
// Release()). It accumulates the bytes read statistic across StartScan* and
// Release()). It accumulates the bytes read statistic across StartScan and
// Close methods.
//
// The field should not be accessed directly by the users of the cFetcher -
Expand Down Expand Up @@ -287,10 +273,6 @@ type cFetcher struct {

accountingHelper colmem.SetAccountingHelper

// kvFetcherMemAcc is a memory account that will be used by the underlying
// KV fetcher.
kvFetcherMemAcc *mon.BoundAccount

// maxCapacity if non-zero indicates the target capacity of the output
// batch. It is set when at the row finalization we realize that the output
// batch has exceeded the memory limit.
Expand Down Expand Up @@ -344,15 +326,14 @@ func (cf *cFetcher) resetBatch() {
}
}

// Init sets up a Fetcher based on the table args. Only columns present in
// Init sets up the cFetcher based on the table args. Only columns present in
// tableArgs.cols will be fetched.
func (cf *cFetcher) Init(
allocator *colmem.Allocator, kvFetcherMemAcc *mon.BoundAccount, tableArgs *cFetcherTableArgs,
allocator *colmem.Allocator, kvFetcher *row.KVFetcher, tableArgs *cFetcherTableArgs,
) error {
if tableArgs.spec.Version != descpb.IndexFetchSpecVersionInitial {
return errors.Newf("unsupported IndexFetchSpec version %d", tableArgs.spec.Version)
}
cf.kvFetcherMemAcc = kvFetcherMemAcc
table := newCTableInfo()
nCols := tableArgs.ColIdxMap.Len()
if cap(table.orderedColIdxMap.vals) < nCols {
Expand Down Expand Up @@ -482,33 +463,22 @@ func (cf *cFetcher) Init(
}

cf.table = table
cf.fetcher = kvFetcher
cf.accountingHelper.Init(allocator, cf.table.typs)

return nil
}

//gcassert:inline
func (cf *cFetcher) setFetcher(f *row.KVFetcher, limitHint rowinfra.RowLimit) {
cf.fetcher = f
cf.machine.lastRowPrefix = nil
cf.machine.limitHint = int(limitHint)
cf.machine.state[0] = stateResetBatch
cf.machine.state[1] = stateInitFetch
}

// StartScan initializes and starts the key-value scan. Can be used multiple
// times.
// StartScan initializes and starts the key-value scan. Can only be used
// multiple times if cFetcherArgs.singleUse was set to false in Init().
//
// The fetcher takes ownership of the spans slice - it can modify the slice and
// will perform the memory accounting accordingly. The caller can only reuse the
// spans slice after the fetcher has been closed (which happens when the fetcher
// emits the first zero batch), and if the caller does, it becomes responsible
// for the memory accounting.
// spans slice after the fetcher emits a zero-length batch, and if the caller
// does, it becomes responsible for the memory accounting.
func (cf *cFetcher) StartScan(
ctx context.Context,
txn *kv.Txn,
spans roachpb.Spans,
bsHeader *roachpb.BoundedStalenessHeader,
limitBatches bool,
batchBytesLimit rowinfra.BytesLimit,
limitHint rowinfra.RowLimit,
Expand Down Expand Up @@ -545,49 +515,13 @@ func (cf *cFetcher) StartScan(
firstBatchLimit = rowinfra.KeyLimit(int(limitHint) * int(cf.table.spec.MaxKeysPerRow))
}

f, err := row.NewKVFetcher(
ctx,
txn,
spans,
nil, /* spanIDs */
bsHeader,
cf.reverse,
batchBytesLimit,
firstBatchLimit,
cf.lockStrength,
cf.lockWaitPolicy,
cf.lockTimeout,
cf.kvFetcherMemAcc,
cf.forceProductionKVBatchSize,
cf.machine.lastRowPrefix = nil
cf.machine.limitHint = int(limitHint)
cf.machine.state[0] = stateResetBatch
cf.machine.state[1] = stateInitFetch
return cf.fetcher.SetupNextFetch(
ctx, spans, nil /* spanIDs */, batchBytesLimit, firstBatchLimit,
)
if err != nil {
return err
}
cf.setFetcher(f, limitHint)
return nil
}

// StartScanStreaming initializes and starts the key-value scan using the
// Streamer API. Can be used multiple times.
//
// The fetcher takes ownership of the spans slice - it can modify the slice and
// will perform the memory accounting accordingly. The caller can only reuse the
// spans slice after the fetcher has been closed (which happens when the fetcher
// emits the first zero batch), and if the caller does, it becomes responsible
// for the memory accounting.
func (cf *cFetcher) StartScanStreaming(
ctx context.Context,
streamer *kvstreamer.Streamer,
spans roachpb.Spans,
limitHint rowinfra.RowLimit,
) error {
kvBatchFetcher, err := row.NewTxnKVStreamer(ctx, streamer, spans, nil /* spanIDs */, cf.lockStrength)
if err != nil {
return err
}
f := row.NewKVStreamingFetcher(kvBatchFetcher)
cf.setFetcher(f, limitHint)
return nil
}

// fetcherState is the state enum for NextBatch.
Expand Down Expand Up @@ -954,13 +888,17 @@ func (cf *cFetcher) NextBatch(ctx context.Context) (coldata.Batch, error) {
case stateEmitLastBatch:
cf.machine.state[0] = stateFinished
cf.finalizeBatch()
// Close the fetcher eagerly so that its memory could be GCed.
cf.Close(ctx)
if cf.singleUse {
// Close the fetcher eagerly so that its memory could be GCed.
cf.Close(ctx)
}
return cf.machine.batch, nil

case stateFinished:
// Close the fetcher eagerly so that its memory could be GCed.
cf.Close(ctx)
if cf.singleUse {
// Close the fetcher eagerly so that its memory could be GCed.
cf.Close(ctx)
}
return coldata.ZeroBatch, nil
}
}
Expand Down Expand Up @@ -1348,7 +1286,7 @@ func (cf *cFetcher) convertFetchError(ctx context.Context, err error) error {

// getBytesRead returns the number of bytes read by the cFetcher throughout its
// existence so far. This number accumulates the bytes read statistic across
// StartScan* and Close methods.
// StartScan and Close methods.
func (cf *cFetcher) getBytesRead() int64 {
if cf.fetcher != nil {
cf.bytesRead += cf.fetcher.ResetBytesRead()
Expand Down
Loading

0 comments on commit e7e724e

Please sign in to comment.