From b06375dac3327d9cd3a8db00e0570211d25016fb Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 18 Sep 2023 06:08:50 -0700 Subject: [PATCH 1/3] flowinfra: fix recently introduced minor bug This commit fixes a minor bug introduced in #110625. In particular, that PR made so that we now unconditionally call `ConsumerClosed` on the "head" processor to make sure that resources are properly closed in the pausable portal model. However, `ConsumerClosed` is only valid to be called only if `Start` was called, so this commit fixes that. Additionally, to de-risk #110625 further we now only call that method for local flows since pausable portals currently disable DistSQL. Epic: None Release note: None --- pkg/sql/flowinfra/flow.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index be2106cb3629..bd5e4512e76f 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -214,6 +214,10 @@ type FlowBase struct { // goroutines. startedGoroutines bool + // headProcStarted tracks whether Start was called on the "head" processor + // in Run. + headProcStarted bool + // inboundStreams are streams that receive data from other hosts; this map // is to be passed to FlowRegistry.RegisterFlow. This map is populated in // Flow.Setup(), so it is safe to lookup into concurrently later. @@ -571,6 +575,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) { } f.resumeCtx = ctx log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc) + f.headProcStarted = true headProc.Run(ctx, headOutput) } @@ -661,17 +666,26 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { // ConsumerClosed on the source (i.e. the "head" processor). // // The method is only called if: +// - the flow is local (pausable portals currently don't support DistSQL) // - there is exactly 1 processor in the flow that runs in its own goroutine // (which is always the case for pausable portal model at this time) +// - Start was called on that processor (ConsumerClosed is only valid to be +// called after Start) // - that single processor implements execinfra.RowSource interface (those // processors that don't implement it shouldn't be running through pausable // portal model). // // Otherwise, this method is a noop. func (f *FlowBase) ConsumerClosedOnHeadProc() { + if !f.IsLocal() { + return + } if len(f.processors) != 1 { return } + if !f.headProcStarted { + return + } rs, ok := f.processors[0].(execinfra.RowSource) if !ok { return From 50cd55ec3f00e395932c6fdafa6e77475a2fe6b5 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Tue, 19 Sep 2023 13:21:09 -0400 Subject: [PATCH 2/3] storage: fix invariant Value assertion Calling Iterator.Value is only permitted on a valid iterator positioned over a point key. Previously the storage package assertions could attempt to read the Value of an iterator positioned only at a range key start bound. Epic: none Fix #110771. Release note: None --- pkg/storage/engine.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 64931cba2d14..7cbb402d5d82 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1945,17 +1945,21 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error { return errors.AssertionFailedf("unknown type for engine key %s", engineKey) } - // Value must equal UnsafeValue. - u, err := iter.UnsafeValue() - if err != nil { - return err - } - v, err := iter.Value() - if err != nil { - return err - } - if !bytes.Equal(v, u) { - return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key) + // If the iterator position has a point key, Value must equal UnsafeValue. + // NB: It's only valid to read an iterator's Value if the iterator is + // positioned at a point key. + if hasPoint, _ := iter.HasPointAndRange(); hasPoint { + u, err := iter.UnsafeValue() + if err != nil { + return err + } + v, err := iter.Value() + if err != nil { + return err + } + if !bytes.Equal(v, u) { + return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key) + } } // For prefix iterators, any range keys must be point-sized. We've already From 252054193de8381def95fd63c67f1cedfe56c4d1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 19 Sep 2023 10:59:52 -0400 Subject: [PATCH 3/3] storage: rename MaxIntentsPerLockConflictError to MaxConflictsPerLockConflictError Informs #110902. This commit is a broad rename of "max intents" to "max lock conflicts", to better reflect the fact that locks of any strength can be returned in LockConflictErrors. The semantics of which locks conflict with which operations are unchanged. Release note: None --- pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 6 +- pkg/kv/kvserver/batcheval/cmd_clear_range.go | 4 +- pkg/kv/kvserver/batcheval/cmd_delete_range.go | 6 +- pkg/kv/kvserver/batcheval/cmd_export.go | 8 +-- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 2 +- pkg/kv/kvserver/batcheval/cmd_scan.go | 2 +- pkg/kv/kvserver/replica_read.go | 6 +- pkg/kv/kvserver/store_test.go | 16 ++--- pkg/storage/engine.go | 4 +- pkg/storage/engine_test.go | 4 +- pkg/storage/metamorphic/operations.go | 2 +- pkg/storage/mvcc.go | 69 +++++++++---------- pkg/storage/mvcc_history_test.go | 6 +- pkg/storage/mvcc_incremental_iterator_test.go | 2 +- pkg/storage/mvcc_stats_test.go | 8 +-- pkg/storage/mvcc_test.go | 8 +-- pkg/storage/pebble_mvcc_scanner.go | 14 ++-- pkg/storage/sst.go | 6 +- pkg/storage/sst_test.go | 22 +++--- pkg/storage/testdata/mvcc_histories/export | 4 +- .../mvcc_histories/export_fingerprint | 4 +- 21 files changed, 101 insertions(+), 102 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 18a84383ceb9..fa0dac2b40d3 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -222,7 +222,7 @@ func EvalAddSSTable( } var statsDelta enginepb.MVCCStats - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) checkConflicts := args.DisallowConflicts || args.DisallowShadowing || !args.DisallowShadowingBelow.IsEmpty() if checkConflicts { @@ -258,7 +258,7 @@ func EvalAddSSTable( log.VEventf(ctx, 2, "checking conflicts for SSTable [%s,%s)", start.Key, end.Key) statsDelta, err = storage.CheckSSTConflicts(ctx, sst, readWriter, start, end, leftPeekBound, rightPeekBound, - args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxIntents, usePrefixSeek) + args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxLockConflicts, usePrefixSeek) statsDelta.Add(sstReqStatsDelta) if err != nil { return result.Result{}, errors.Wrap(err, "checking for key collisions") @@ -269,7 +269,7 @@ func EvalAddSSTable( // caller is expected to make sure there are no writers across the span, // and thus no or few locks, so this is cheap in the common case. log.VEventf(ctx, 2, "checking conflicting locks for SSTable [%s,%s)", start.Key, end.Key) - locks, err := storage.ScanLocks(ctx, readWriter, start.Key, end.Key, maxIntents, 0) + locks, err := storage.ScanLocks(ctx, readWriter, start.Key, end.Key, maxLockConflicts, 0) if err != nil { return result.Result{}, errors.Wrap(err, "scanning locks") } else if len(locks) > 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_clear_range.go b/pkg/kv/kvserver/batcheval/cmd_clear_range.go index b725058e9484..f3c3d0c742c2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_clear_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_clear_range.go @@ -112,8 +112,8 @@ func ClearRange( // txns. Otherwise, txn recovery would fail to find these intents and // consider the txn incomplete, uncommitting it and its writes (even those // outside of the cleared range). - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) - locks, err := storage.ScanLocks(ctx, readWriter, from, to, maxIntents, 0) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + locks, err := storage.ScanLocks(ctx, readWriter, from, to, maxLockConflicts, 0) if err != nil { return result.Result{}, err } else if len(locks) > 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index 8158507d0c3f..cb805b803375 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -152,7 +152,7 @@ func DeleteRange( leftPeekBound, rightPeekBound := rangeTombstonePeekBounds( args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey()) - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) // If no predicate parameters are passed, use the fast path. If we're // deleting the entire Raft range, use an even faster path that avoids a @@ -167,7 +167,7 @@ func DeleteRange( } if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, - args.IdempotentTombstone, maxIntents, statsCovered); err != nil { + args.IdempotentTombstone, maxLockConflicts, statsCovered); err != nil { return result.Result{}, err } var res result.Result @@ -197,7 +197,7 @@ func DeleteRange( resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes, - defaultRangeTombstoneThreshold, maxIntents) + defaultRangeTombstoneThreshold, maxLockConflicts) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 259c03756926..1ed47e7c3c2d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -154,9 +154,9 @@ func evalExport( maxSize = targetSize + uint64(allowedOverage) } - var maxIntents uint64 - if m := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV); m > 0 { - maxIntents = uint64(m) + var maxLockConflicts uint64 + if m := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV); m > 0 { + maxLockConflicts = uint64(m) } // Only use resume timestamp if splitting mid key is enabled. @@ -184,7 +184,7 @@ func evalExport( ExportAllRevisions: exportAllRevisions, TargetSize: targetSize, MaxSize: maxSize, - MaxIntents: maxIntents, + MaxLockConflicts: maxLockConflicts, StopMidKey: args.SplitMidKey, } var summary kvpb.BulkOpSummary diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 18115de51ba9..6deb362cf52c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -46,7 +46,7 @@ func ReverseScan( ScanStats: cArgs.ScanStats, Uncertainty: cArgs.Uncertainty, MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetBytes: h.TargetBytes, AllowEmpty: h.AllowEmpty, WholeRowsOfSize: h.WholeRowsOfSize, diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 0307058698ed..11902ec2129b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -46,7 +46,7 @@ func Scan( ScanStats: cArgs.ScanStats, Uncertainty: cArgs.Uncertainty, MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetBytes: h.TargetBytes, AllowEmpty: h.AllowEmpty, WholeRowsOfSize: h.WholeRowsOfSize, diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 46f63da881ed..954f6c14e6de 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -303,7 +303,7 @@ func (r *Replica) canDropLatchesBeforeEval( ctx, 3, "can drop latches early for batch (%v); scanning lock table first to detect conflicts", ba, ) - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&r.store.cfg.Settings.SV) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&r.store.cfg.Settings.SV) var intents []roachpb.Intent // Check if any of the requests within the batch need to resolve any intents // or if any of them need to use an intent interleaving iterator. @@ -315,7 +315,7 @@ func (r *Replica) canDropLatchesBeforeEval( txnID = ba.Txn.ID } needsIntentInterleavingForThisRequest, err := storage.ScanConflictingIntentsForDroppingLatchesEarly( - ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxIntents, + ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxLockConflicts, ) if err != nil { return false /* ok */, true /* stillNeedsIntentInterleaving */, kvpb.NewError( @@ -323,7 +323,7 @@ func (r *Replica) canDropLatchesBeforeEval( ) } stillNeedsIntentInterleaving = stillNeedsIntentInterleaving || needsIntentInterleavingForThisRequest - if maxIntents != 0 && int64(len(intents)) >= maxIntents { + if maxLockConflicts != 0 && int64(len(intents)) >= maxLockConflicts { break } } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 9d3db54f95ff..3783475b0e7c 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2306,9 +2306,9 @@ func TestStoreScanIntents(t *testing.T) { // limits. // // The test proceeds as follows: a writer lays down more than -// `MaxIntentsPerLockConflictError` intents, and a reader is expected to +// `MaxConflictsPerLockConflictError` intents, and a reader is expected to // encounter these intents and raise a `LockConflictError` with exactly -// `MaxIntentsPerLockConflictError` intents in the error. +// `MaxConflictsPerLockConflictError` intents in the error. func TestStoreScanIntentsRespectsLimit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2331,10 +2331,10 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) { ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error, ) { if errors.HasType(pErr.GoError(), (*kvpb.LockConflictError)(nil)) { - // Assert that the LockConflictError has MaxIntentsPerLockConflictError intents. + // Assert that the LockConflictError has MaxConflictsPerLockConflictError intents. if trap := interceptLockConflictErrors.Load(); trap != nil && trap.(bool) { require.Equal( - t, storage.MaxIntentsPerLockConflictErrorDefault, + t, storage.MaxConflictsPerLockConflictErrorDefault, len(pErr.GetDetail().(*kvpb.LockConflictError).Locks), ) interceptLockConflictErrors.Store(false) @@ -2356,13 +2356,13 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - // Lay down more than `MaxIntentsPerLockConflictErrorDefault` intents. + // Lay down more than `MaxConflictsPerLockConflictErrorDefault` intents. go func() { defer wg.Done() txn := newTransaction( "test", roachpb.Key("test-key"), roachpb.NormalUserPriority, tc.Server(0).Clock(), ) - for j := 0; j < storage.MaxIntentsPerLockConflictErrorDefault+10; j++ { + for j := 0; j < storage.MaxConflictsPerLockConflictErrorDefault+10; j++ { var key roachpb.Key key = append(key, keys.ScratchRangeMin...) key = append(key, []byte(fmt.Sprintf("%d", j))...) @@ -2385,10 +2385,10 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) { } // Now, expect a conflicting reader to encounter the intents and raise a - // LockConflictError with exactly `MaxIntentsPerLockConflictErrorDefault` + // LockConflictError with exactly `MaxConflictsPerLockConflictErrorDefault` // intents. See the TestingConcurrencyRetryFilter above. var ba kv.Batch - for i := 0; i < storage.MaxIntentsPerLockConflictErrorDefault+10; i += 10 { + for i := 0; i < storage.MaxConflictsPerLockConflictErrorDefault+10; i += 10 { for _, key := range intentKeys[i : i+10] { args := getArgs(key) ba.AddRawRequest(&args) diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 64931cba2d14..29a1530e7fed 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1994,7 +1994,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly( ts hlc.Timestamp, start, end roachpb.Key, intents *[]roachpb.Intent, - maxIntents int64, + maxLockConflicts int64, ) (needIntentHistory bool, err error) { if err := ctx.Err(); err != nil { return false, err @@ -2031,7 +2031,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly( var meta enginepb.MVCCMetadata var ok bool for ok, err = iter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ok; ok, err = iter.NextEngineKey() { - if maxIntents != 0 && int64(len(*intents)) >= maxIntents { + if maxLockConflicts != 0 && int64(len(*intents)) >= maxLockConflicts { // Return early if we're done accumulating intents; make no claims about // not needing intent history. return true /* needsIntentHistory */, nil diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 2748166a7d4e..c0a3c7e21289 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -2264,7 +2264,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarly(t *testing.T) { tc.start, tc.end, &intents, - 0, /* maxIntents */ + 0, /* maxLockConflicts */ ) if tc.expErr != "" { require.Error(t, err) @@ -2486,7 +2486,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarlyReadYourOwnWrites(t *testi keyA, nil, &intents, - 0, /* maxIntents */ + 0, /* maxLockConflicts */ ) require.NoError(t, err) if alwaysFallbackToIntentInterleavingIteratorForReadYourOwnWrites { diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 23775952d99c..b65ad888de00 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -335,7 +335,7 @@ func (m mvccDeleteRangeUsingRangeTombstoneOp) run(ctx context.Context) string { } err := storage.MVCCDeleteRangeUsingTombstone(ctx, writer, nil, m.key, m.endKey, m.ts, - hlc.ClockTimestamp{}, m.key, m.endKey, false /* idempotent */, math.MaxInt64, /* maxIntents */ + hlc.ClockTimestamp{}, m.key, m.endKey, false /* idempotent */, math.MaxInt64, /* maxLockConflicts */ nil /* msCovered */) if err != nil { return fmt.Sprintf("error: %s", err) diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 2afb6fd18e1c..597d1b71fdf2 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -61,12 +61,11 @@ const ( // minimum total for a single store node must be under 2048 for Windows // compatibility. MinimumMaxOpenFiles = 1700 - // MaxIntentsPerLockConflictErrorDefault is the default value for maximum - // number of intents reported by ExportToSST and Scan operations in - // LockConflictError is set to half of the maximum lock table size. This - // value is subject to tuning in real environment as we have more data - // available. - MaxIntentsPerLockConflictErrorDefault = 5000 + // MaxConflictsPerLockConflictErrorDefault is the default value for maximum + // number of locks reported by ExportToSST and Scan operations in + // LockConflictError is set to half of the maximum lock table size. This value + // is subject to tuning in real environment as we have more data available. + MaxConflictsPerLockConflictErrorDefault = 5000 ) var minWALSyncInterval = settings.RegisterDurationSetting( @@ -108,15 +107,15 @@ func CanUseMVCCRangeTombstones(ctx context.Context, st *cluster.Settings) bool { MVCCRangeTombstonesEnabledInMixedClusters.Get(&st.SV) } -// MaxIntentsPerLockConflictError sets maximum number of intents returned in -// LockConflictError in operations that return multiple intents per error. +// MaxConflictsPerLockConflictError sets maximum number of locks returned in +// LockConflictError in operations that return multiple locks per error. // Currently it is used in Scan, ReverseScan, and ExportToSST. -// TODO(nvanbenschoten): rename to MaxLocksPerLockConflictError. -var MaxIntentsPerLockConflictError = settings.RegisterIntSetting( +var MaxConflictsPerLockConflictError = settings.RegisterIntSetting( settings.TenantWritable, "storage.mvcc.max_intents_per_error", - "maximum number of intents returned in error during export of scan requests", - MaxIntentsPerLockConflictErrorDefault, + "maximum number of locks returned in error during export or scan requests", + MaxConflictsPerLockConflictErrorDefault, + settings.WithName("storage.mvcc.max_conflicts_per_lock_conflict_error"), ) var rocksdbConcurrency = envutil.EnvOrDefaultInt( @@ -3274,7 +3273,7 @@ func MVCCDeleteRange( // // This operation is non-transactional, but will check for existing intents in // the target key span, regardless of timestamp, and return a LockConflictError -// containing up to maxIntents intents. +// containing up to maxLockConflicts locks. // // MVCCPredicateDeleteRange will return with a resumeSpan if the number of tombstones // written exceeds maxBatchSize or the size of the written tombstones exceeds maxByteSize. @@ -3307,7 +3306,7 @@ func MVCCPredicateDeleteRange( predicates kvpb.DeleteRangePredicates, maxBatchSize, maxBatchByteSize int64, rangeTombstoneThreshold int64, - maxIntents int64, + maxLockConflicts int64, ) (*roachpb.Span, error) { if maxBatchSize == 0 { @@ -3337,7 +3336,7 @@ func MVCCPredicateDeleteRange( } // Check for any overlapping locks, and return them to be resolved. - if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxIntents, 0); err != nil { + if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxLockConflicts, 0); err != nil { return nil, err } else if len(locks) > 0 { return nil, &kvpb.LockConflictError{Locks: locks} @@ -3435,7 +3434,7 @@ func MVCCPredicateDeleteRange( batchByteSize+runByteSize >= maxBatchByteSize { if err := MVCCDeleteRangeUsingTombstone(ctx, rw, ms, runStart, runEnd.Next(), endTime, localTimestamp, leftPeekBound, rightPeekBound, - false /* idempotent */, maxIntents, nil); err != nil { + false /* idempotent */, maxLockConflicts, nil); err != nil { return err } batchByteSize += int64(MVCCRangeKey{StartKey: runStart, EndKey: runEnd, Timestamp: endTime}.EncodedSize()) @@ -3576,8 +3575,8 @@ func MVCCPredicateDeleteRange( // MVCCDeleteRangeUsingTombstone deletes the given MVCC keyspan at the given // timestamp using an MVCC range tombstone (rather than MVCC point tombstones). // This operation is non-transactional, but will check for existing intents and -// return a LockConflictError containing up to maxIntents intents. Can't be used -// across local keyspace. +// return a LockConflictError containing up to maxLockConflicts locks. Can't be +// used across local keyspace. // // The leftPeekBound and rightPeekBound parameters are used when looking for // range tombstones that we'll merge or overlap with. These are provided to @@ -3609,7 +3608,7 @@ func MVCCDeleteRangeUsingTombstone( localTimestamp hlc.ClockTimestamp, leftPeekBound, rightPeekBound roachpb.Key, idempotent bool, - maxIntents int64, + maxLockConflicts int64, msCovered *enginepb.MVCCStats, ) error { // Validate the range key. We must do this first, to catch e.g. any bound violations. @@ -3642,7 +3641,7 @@ func MVCCDeleteRangeUsingTombstone( } // Check for any overlapping locks, and return them to be resolved. - if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxIntents, 0); err != nil { + if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxLockConflicts, 0); err != nil { return err } else if len(locks) > 0 { return &kvpb.LockConflictError{Locks: locks} @@ -3955,7 +3954,7 @@ func mvccScanInit( targetBytes: opts.TargetBytes, allowEmpty: opts.AllowEmpty, wholeRows: opts.WholeRowsOfSize > 1, // single-KV rows don't need processing - maxIntents: opts.MaxIntents, + maxLockConflicts: opts.MaxLockConflicts, inconsistent: opts.Inconsistent, skipLocked: opts.SkipLocked, tombstones: opts.Tombstones, @@ -4154,12 +4153,12 @@ type MVCCScanOptions struct { // and AllowEmpty is false, in which case the remaining KV pairs of the row // will be fetched and returned too. WholeRowsOfSize int32 - // MaxIntents is a maximum number of intents collected by scanner in - // consistent mode before returning LockConflictError. + // MaxLockConflicts is a maximum number of locks (intents) collected by + // scanner in consistent mode before returning LockConflictError. // // Not used in inconsistent scans. // The zero value indicates no limit. - MaxIntents int64 + MaxLockConflicts int64 // MemoryAccount is used for tracking memory allocations. MemoryAccount *mon.BoundAccount // LockTable is used to determine whether keys are locked in the in-memory @@ -5415,12 +5414,12 @@ func MVCCCheckForAcquireLock( txn *roachpb.Transaction, str lock.Strength, key roachpb.Key, - maxConflicts int64, + maxLockConflicts int64, ) error { if err := validateLockAcquisition(txn, str); err != nil { return err } - ltScanner, err := newLockTableKeyScanner(reader, txn, str, maxConflicts) + ltScanner, err := newLockTableKeyScanner(reader, txn, str, maxLockConflicts) if err != nil { return err } @@ -5440,12 +5439,12 @@ func MVCCAcquireLock( str lock.Strength, key roachpb.Key, ms *enginepb.MVCCStats, - maxConflicts int64, + maxLockConflicts int64, ) error { if err := validateLockAcquisition(txn, str); err != nil { return err } - ltScanner, err := newLockTableKeyScanner(rw, txn, str, maxConflicts) + ltScanner, err := newLockTableKeyScanner(rw, txn, str, maxLockConflicts) if err != nil { return err } @@ -7254,7 +7253,7 @@ func mvccExportToWriter( // If we do it means this export can't complete and is aborted. We need to loop over remaining data // to collect all matching intents before returning them in an error to the caller. if iter.NumCollectedIntents() > 0 { - for uint64(iter.NumCollectedIntents()) < opts.MaxIntents { + for uint64(iter.NumCollectedIntents()) < opts.MaxLockConflicts { iter.NextKey() // If we encounter other errors during intent collection, we return our original write intent failure. // We would find this new error again upon retry. @@ -7328,12 +7327,12 @@ type MVCCExportOptions struct { // to an SST that exceeds maxSize, an error will be returned. This parameter // exists to prevent creating SSTs which are too large to be used. MaxSize uint64 - // MaxIntents specifies the number of intents to collect and return in a - // LockConflictError. The caller will likely resolve the returned intents and - // retry the call, which would be quadratic, so this significantly reduces the - // overall number of scans. 0 disables batching and returns the first intent, - // pass math.MaxUint64 to collect all. - MaxIntents uint64 + // MaxLockConflicts specifies the number of locks (intents) to collect and + // return in a LockConflictError. The caller will likely resolve the returned + // intents and retry the call, which would be quadratic, so this significantly + // reduces the overall number of scans. 0 disables batching and returns the + // first intent, pass math.MaxUint64 to collect all. + MaxLockConflicts uint64 // If StopMidKey is false, once function reaches targetSize it would continue // adding all versions until it reaches next key or end of range. If true, it // would stop immediately when targetSize is reached and return the next versions diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 0a2189ddec48..106daa83146c 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -101,7 +101,7 @@ var ( // put_blind_inline k= v= [prev=] // get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [maxKeys=] [targetBytes=] [allowEmpty] // scan [t=] [ts=[,]] [resolve [status=]] k= [end=] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [max=] [targetbytes=] [wholeRows[=]] [allowEmpty] -// export [k=] [end=] [ts=[,]] [kTs=[,]] [startTs=[,]] [maxIntents=] [allRevisions] [targetSize=] [maxSize=] [stopMidKey] [fingerprint] +// export [k=] [end=] [ts=[,]] [kTs=[,]] [startTs=[,]] [maxLockConflicts=] [allRevisions] [targetSize=] [maxSize=] [stopMidKey] [fingerprint] // // iter_new [k=] [end=] [prefix] [kind=key|keyAndIntents] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=[,]] // iter_new_incremental [k=] [end=] [startTs=[,]] [endTs=[,]] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=[,]] [intents=error|aggregate|emit] @@ -1531,8 +1531,8 @@ func cmdExport(e *evalCtx) error { StripIndexPrefixAndTimestamp: e.hasArg("stripped"), }, } - if e.hasArg("maxIntents") { - e.scanArg("maxIntents", &opts.MaxIntents) + if e.hasArg("maxLockConflicts") { + e.scanArg("maxLockConflicts", &opts.MaxLockConflicts) } if e.hasArg("targetSize") { e.scanArg("targetSize", &opts.TargetSize) diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 5f81cee77624..5b563b2a3ff8 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -201,7 +201,7 @@ func assertExportedErrs( ExportAllRevisions: revisions, TargetSize: big, MaxSize: big, - MaxIntents: uint64(MaxIntentsPerLockConflictError.Default()), + MaxLockConflicts: uint64(MaxConflictsPerLockConflictError.Default()), StopMidKey: false, }, &bytes.Buffer{}) require.Error(t, err) diff --git a/pkg/storage/mvcc_stats_test.go b/pkg/storage/mvcc_stats_test.go index e5cee4f4ac1f..fd9b1c975319 100644 --- a/pkg/storage/mvcc_stats_test.go +++ b/pkg/storage/mvcc_stats_test.go @@ -1724,20 +1724,20 @@ func TestMVCCStatsRandomized(t *testing.T) { desc = fmt.Sprintf("mvccDeleteRangeUsingTombstone=%s", roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}) const idempotent = false - const maxIntents = 0 // unlimited + const maxLockConflicts = 0 // unlimited msCovered := (*enginepb.MVCCStats)(nil) err = MVCCDeleteRangeUsingTombstone( ctx, s.batch, s.MSDelta, mvccRangeDelKey, mvccRangeDelEndKey, s.TS, hlc.ClockTimestamp{}, nil, /* leftPeekBound */ - nil /* rightPeekBound */, idempotent, maxIntents, msCovered, + nil /* rightPeekBound */, idempotent, maxLockConflicts, msCovered, ) } else { rangeTombstoneThreshold := s.rng.Int63n(5) desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%s, rangeTombstoneThreshold=%d", roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}, predicates, rangeTombstoneThreshold) - const maxIntents = 0 // unlimited + const maxLockConflicts = 0 // unlimited _, err = MVCCPredicateDeleteRange(ctx, s.batch, s.MSDelta, mvccRangeDelKey, mvccRangeDelEndKey, s.TS, hlc.ClockTimestamp{}, nil /* leftPeekBound */, nil, /* rightPeekBound */ - predicates, 0, 0, rangeTombstoneThreshold, maxIntents) + predicates, 0, 0, rangeTombstoneThreshold, maxLockConflicts) } if err != nil { return false, desc + ": " + err.Error() diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 61f0ee2f9672..a471aac1fd5c 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -597,7 +597,7 @@ func TestMVCCScanLockConflictError(t *testing.T) { for _, scan := range scanCases { t.Run(scan.name, func(t *testing.T) { res, err := MVCCScan(ctx, engine, testKey1, testKey6.Next(), - hlc.Timestamp{WallTime: 1}, MVCCScanOptions{Inconsistent: !scan.consistent, Txn: scan.txn, MaxIntents: 2}) + hlc.Timestamp{WallTime: 1}, MVCCScanOptions{Inconsistent: !scan.consistent, Txn: scan.txn, MaxLockConflicts: 2}) var lcErr *kvpb.LockConflictError _ = errors.As(err, &lcErr) if (err == nil) != (lcErr == nil) { @@ -6635,7 +6635,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { ExportAllRevisions: true, TargetSize: 0, MaxSize: 0, - MaxIntents: uint64(MaxIntentsPerLockConflictError.Default()), + MaxLockConflicts: uint64(MaxConflictsPerLockConflictError.Default()), StopMidKey: false, }, &bytes.Buffer{}) if len(expectedIntentIndices) == 0 { @@ -6655,7 +6655,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { } // Export range is fixed to k:["00010", "10000"), ts:(999, 2000] for all tests. - testDataCount := int(MaxIntentsPerLockConflictError.Default() + 1) + testDataCount := int(MaxConflictsPerLockConflictError.Default() + 1) testData := make([]testValue, testDataCount*2) expectedErrors := make([]int, testDataCount) for i := 0; i < testDataCount; i++ { @@ -6663,7 +6663,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { testData[i*2+1] = intent(key(i*2+12), "intent", ts(1001)) expectedErrors[i] = i*2 + 1 } - t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:MaxIntentsPerLockConflictError.Default()])) + t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:MaxConflictsPerLockConflictError.Default()])) } // TestMVCCExportToSSTSplitMidKey verifies that split mid key in exports will diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index de09c4bf9fbf..3524c35698ce 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -408,12 +408,12 @@ type pebbleMVCCScanner struct { // allowEmpty is false, and the partial row is the first row in the result, // the row will instead be completed by fetching additional KV pairs. wholeRows bool - // Stop adding intents and abort scan once maxIntents threshold is reached. - // This limit is only applicable to consistent scans since they return - // intents as an error. + // Stop adding intents and abort scan once maxLockConflicts threshold is + // reached. This limit is only applicable to consistent scans since they + // return intents as an error. // Not used in inconsistent scans. // Ignored if zero. - maxIntents int64 + maxLockConflicts int64 // Resume fields describe the resume span to return. resumeReason must be set // to a non-zero value to return a resume span, the others are optional. resumeReason kvpb.ResumeReason @@ -982,7 +982,7 @@ func (p *pebbleMVCCScanner) getOne(ctx context.Context) (ok, added bool) { // may want to resolve it. Unlike below, this intent will not result in // a LockConflictError because MVCC{Scan,Get}Options.errOnIntents returns // false when skipLocked in enabled. - if p.maxIntents == 0 || int64(p.intents.Count()) < p.maxIntents { + if p.maxLockConflicts == 0 || int64(p.intents.Count()) < p.maxLockConflicts { if !p.addCurIntent(ctx) { return false, false } @@ -1005,7 +1005,7 @@ func (p *pebbleMVCCScanner) getOne(ctx context.Context) (ok, added bool) { return false, false } // Limit number of intents returned in lock conflict error. - if p.maxIntents > 0 && int64(p.intents.Count()) >= p.maxIntents { + if p.maxLockConflicts > 0 && int64(p.intents.Count()) >= p.maxLockConflicts { p.resumeReason = kvpb.RESUME_INTENT_LIMIT return false, false } @@ -1816,7 +1816,7 @@ func (p *pebbleMVCCScanner) isKeyLockedByConflictingTxn( } if ok { // The key is locked or reserved, so ignore it. - if txn != nil && (p.maxIntents == 0 || int64(p.intents.Count()) < p.maxIntents) { + if txn != nil && (p.maxLockConflicts == 0 || int64(p.intents.Count()) < p.maxLockConflicts) { // However, if the key is locked, we return the lock holder separately // (if we have room); the caller may want to resolve it. if !p.addKeyAndMetaAsIntent(ctx, key, txn) { diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 4b156f197cc9..a8108efafdca 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -112,7 +112,7 @@ func CheckSSTConflicts( disallowShadowing bool, disallowShadowingBelow hlc.Timestamp, sstTimestamp hlc.Timestamp, - maxIntents int64, + maxLockConflicts int64, usePrefixSeek bool, ) (enginepb.MVCCStats, error) { @@ -281,7 +281,7 @@ func CheckSSTConflicts( // would be quadratic, so this significantly reduces the overall number // of scans. intents = append(intents, roachpb.MakeIntent(mvccMeta.Txn, extIter.UnsafeKey().Key.Clone())) - if int64(len(intents)) >= maxIntents { + if int64(len(intents)) >= maxLockConflicts { return &kvpb.LockConflictError{Locks: roachpb.AsLocks(intents)} } return nil @@ -531,7 +531,7 @@ func CheckSSTConflicts( return enginepb.MVCCStats{}, err } intents = append(intents, roachpb.MakeIntent(mvccMeta.Txn, extIter.UnsafeKey().Key.Clone())) - if int64(len(intents)) >= maxIntents { + if int64(len(intents)) >= maxLockConflicts { return statsDiff, &kvpb.LockConflictError{Locks: roachpb.AsLocks(intents)} } extIter.Next() diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index a5b123f5f261..67e8003f87f8 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -29,7 +29,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestCheckSSTConflictsMaxIntents(t *testing.T) { +func TestCheckSSTConflictsMaxLockConflicts(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -38,15 +38,15 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { start, end := "a", "z" testcases := []struct { - maxIntents int64 - expectIntents []string + maxLockConflicts int64 + expectIntents []string }{ - {maxIntents: -1, expectIntents: []string{"a"}}, - {maxIntents: 0, expectIntents: []string{"a"}}, - {maxIntents: 1, expectIntents: []string{"a"}}, - {maxIntents: 2, expectIntents: []string{"a", "b"}}, - {maxIntents: 3, expectIntents: []string{"a", "b", "c"}}, - {maxIntents: 4, expectIntents: []string{"a", "b", "c"}}, + {maxLockConflicts: -1, expectIntents: []string{"a"}}, + {maxLockConflicts: 0, expectIntents: []string{"a"}}, + {maxLockConflicts: 1, expectIntents: []string{"a"}}, + {maxLockConflicts: 2, expectIntents: []string{"a", "b"}}, + {maxLockConflicts: 3, expectIntents: []string{"a", "b", "c"}}, + {maxLockConflicts: 4, expectIntents: []string{"a", "b", "c"}}, } // Create SST with keys equal to intents at txn2TS. @@ -86,13 +86,13 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { require.NoError(t, engine.Flush()) for _, tc := range testcases { - t.Run(fmt.Sprintf("maxIntents=%d", tc.maxIntents), func(t *testing.T) { + t.Run(fmt.Sprintf("maxLockConflicts=%d", tc.maxLockConflicts), func(t *testing.T) { for _, usePrefixSeek := range []bool{false, true} { t.Run(fmt.Sprintf("usePrefixSeek=%v", usePrefixSeek), func(t *testing.T) { // Provoke and check LockConflictError. startKey, endKey := MVCCKey{Key: roachpb.Key(start)}, MVCCKey{Key: roachpb.Key(end)} _, err := CheckSSTConflicts(ctx, sstFile.Bytes(), engine, startKey, endKey, startKey.Key, endKey.Key.Next(), - false /*disallowShadowing*/, hlc.Timestamp{} /*disallowShadowingBelow*/, hlc.Timestamp{} /* sstReqTS */, tc.maxIntents, usePrefixSeek) + false /*disallowShadowing*/, hlc.Timestamp{} /*disallowShadowingBelow*/, hlc.Timestamp{} /* sstReqTS */, tc.maxLockConflicts, usePrefixSeek) require.Error(t, err) lcErr := &kvpb.LockConflictError{} require.ErrorAs(t, err, &lcErr) diff --git a/pkg/storage/testdata/mvcc_histories/export b/pkg/storage/testdata/mvcc_histories/export index 6fe60279c9b0..9fe8b0299143 100644 --- a/pkg/storage/testdata/mvcc_histories/export +++ b/pkg/storage/testdata/mvcc_histories/export @@ -84,12 +84,12 @@ export k=a end=z error: (*kvpb.LockConflictError:) conflicting locks on "a" run error -export k=a end=z maxIntents=100 +export k=a end=z maxLockConflicts=100 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j", "l", "o" run error -export k=a end=z maxIntents=3 +export k=a end=z maxLockConflicts=3 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j" diff --git a/pkg/storage/testdata/mvcc_histories/export_fingerprint b/pkg/storage/testdata/mvcc_histories/export_fingerprint index dd1941c8ba45..5b2f45a63735 100644 --- a/pkg/storage/testdata/mvcc_histories/export_fingerprint +++ b/pkg/storage/testdata/mvcc_histories/export_fingerprint @@ -84,12 +84,12 @@ export fingerprint k=a end=z error: (*kvpb.LockConflictError:) conflicting locks on "a" run error -export fingerprint k=a end=z maxIntents=100 +export fingerprint k=a end=z maxLockConflicts=100 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j", "l", "o" run error -export fingerprint k=a end=z maxIntents=3 +export fingerprint k=a end=z maxLockConflicts=3 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j"