Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
110807: flowinfra: fix recently introduced minor bug r=yuzefovich a=yuzefovich

This commit fixes a minor bug introduced in cockroachdb#110625. In particular, that PR made so that we now unconditionally call `ConsumerClosed` on the "head" processor to make sure that resources are properly closed in the pausable portal model. However, `ConsumerClosed` is only valid to be called only if `Start` was called, so this commit fixes that. Additionally, to de-risk cockroachdb#110625 further we now only call that method for local flows since pausable portals currently disable DistSQL.

Epic: None

Release note: None

110903: storage: rename MaxIntentsPerLockConflictError to MaxConflictsPerLockConflictError r=nvanbenschoten a=nvanbenschoten

Informs cockroachdb#110902.

This commit is a broad rename of "max intents" to "max lock conflicts", to better reflect the fact that locks of any strength can be returned in LockConflictErrors. The semantics of which locks conflict with which operations are unchanged.

Release note: None

110916: storage: fix invariant Value assertion r=itsbilal a=jbowens

Calling Iterator.Value is only permitted on a valid iterator positioned over a point key. Previously the storage package assertions could attempt to read the Value of an iterator positioned only at a range key start bound.

Epic: none
Fix cockroachdb#110771.
Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
4 people committed Sep 19, 2023
4 parents 3071cac + b06375d + 2520541 + 50cd55e commit 0ff99f1
Show file tree
Hide file tree
Showing 22 changed files with 130 additions and 113 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func EvalAddSSTable(
}

var statsDelta enginepb.MVCCStats
maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
checkConflicts := args.DisallowConflicts || args.DisallowShadowing ||
!args.DisallowShadowingBelow.IsEmpty()
if checkConflicts {
Expand Down Expand Up @@ -258,7 +258,7 @@ func EvalAddSSTable(

log.VEventf(ctx, 2, "checking conflicts for SSTable [%s,%s)", start.Key, end.Key)
statsDelta, err = storage.CheckSSTConflicts(ctx, sst, readWriter, start, end, leftPeekBound, rightPeekBound,
args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxIntents, usePrefixSeek)
args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxLockConflicts, usePrefixSeek)
statsDelta.Add(sstReqStatsDelta)
if err != nil {
return result.Result{}, errors.Wrap(err, "checking for key collisions")
Expand All @@ -269,7 +269,7 @@ func EvalAddSSTable(
// caller is expected to make sure there are no writers across the span,
// and thus no or few locks, so this is cheap in the common case.
log.VEventf(ctx, 2, "checking conflicting locks for SSTable [%s,%s)", start.Key, end.Key)
locks, err := storage.ScanLocks(ctx, readWriter, start.Key, end.Key, maxIntents, 0)
locks, err := storage.ScanLocks(ctx, readWriter, start.Key, end.Key, maxLockConflicts, 0)
if err != nil {
return result.Result{}, errors.Wrap(err, "scanning locks")
} else if len(locks) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func ClearRange(
// txns. Otherwise, txn recovery would fail to find these intents and
// consider the txn incomplete, uncommitting it and its writes (even those
// outside of the cleared range).
maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
locks, err := storage.ScanLocks(ctx, readWriter, from, to, maxIntents, 0)
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
locks, err := storage.ScanLocks(ctx, readWriter, from, to, maxLockConflicts, 0)
if err != nil {
return result.Result{}, err
} else if len(locks) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func DeleteRange(

leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// If no predicate parameters are passed, use the fast path. If we're
// deleting the entire Raft range, use an even faster path that avoids a
Expand All @@ -167,7 +167,7 @@ func DeleteRange(
}
if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.IdempotentTombstone, maxIntents, statsCovered); err != nil {
args.IdempotentTombstone, maxLockConflicts, statsCovered); err != nil {
return result.Result{}, err
}
var res result.Result
Expand Down Expand Up @@ -197,7 +197,7 @@ func DeleteRange(
resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats,
args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound,
args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes,
defaultRangeTombstoneThreshold, maxIntents)
defaultRangeTombstoneThreshold, maxLockConflicts)
if err != nil {
return result.Result{}, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func evalExport(
maxSize = targetSize + uint64(allowedOverage)
}

var maxIntents uint64
if m := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV); m > 0 {
maxIntents = uint64(m)
var maxLockConflicts uint64
if m := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV); m > 0 {
maxLockConflicts = uint64(m)
}

// Only use resume timestamp if splitting mid key is enabled.
Expand Down Expand Up @@ -184,7 +184,7 @@ func evalExport(
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
MaxIntents: maxIntents,
MaxLockConflicts: maxLockConflicts,
StopMidKey: args.SplitMidKey,
}
var summary kvpb.BulkOpSummary
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func ReverseScan(
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Scan(
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (r *Replica) canDropLatchesBeforeEval(
ctx, 3, "can drop latches early for batch (%v); scanning lock table first to detect conflicts", ba,
)

maxIntents := storage.MaxIntentsPerLockConflictError.Get(&r.store.cfg.Settings.SV)
maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&r.store.cfg.Settings.SV)
var intents []roachpb.Intent
// Check if any of the requests within the batch need to resolve any intents
// or if any of them need to use an intent interleaving iterator.
Expand All @@ -315,15 +315,15 @@ func (r *Replica) canDropLatchesBeforeEval(
txnID = ba.Txn.ID
}
needsIntentInterleavingForThisRequest, err := storage.ScanConflictingIntentsForDroppingLatchesEarly(
ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxIntents,
ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxLockConflicts,
)
if err != nil {
return false /* ok */, true /* stillNeedsIntentInterleaving */, kvpb.NewError(
errors.Wrap(err, "scanning intents"),
)
}
stillNeedsIntentInterleaving = stillNeedsIntentInterleaving || needsIntentInterleavingForThisRequest
if maxIntents != 0 && int64(len(intents)) >= maxIntents {
if maxLockConflicts != 0 && int64(len(intents)) >= maxLockConflicts {
break
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2306,9 +2306,9 @@ func TestStoreScanIntents(t *testing.T) {
// limits.
//
// The test proceeds as follows: a writer lays down more than
// `MaxIntentsPerLockConflictError` intents, and a reader is expected to
// `MaxConflictsPerLockConflictError` intents, and a reader is expected to
// encounter these intents and raise a `LockConflictError` with exactly
// `MaxIntentsPerLockConflictError` intents in the error.
// `MaxConflictsPerLockConflictError` intents in the error.
func TestStoreScanIntentsRespectsLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -2331,10 +2331,10 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) {
ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error,
) {
if errors.HasType(pErr.GoError(), (*kvpb.LockConflictError)(nil)) {
// Assert that the LockConflictError has MaxIntentsPerLockConflictError intents.
// Assert that the LockConflictError has MaxConflictsPerLockConflictError intents.
if trap := interceptLockConflictErrors.Load(); trap != nil && trap.(bool) {
require.Equal(
t, storage.MaxIntentsPerLockConflictErrorDefault,
t, storage.MaxConflictsPerLockConflictErrorDefault,
len(pErr.GetDetail().(*kvpb.LockConflictError).Locks),
)
interceptLockConflictErrors.Store(false)
Expand All @@ -2356,13 +2356,13 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)

// Lay down more than `MaxIntentsPerLockConflictErrorDefault` intents.
// Lay down more than `MaxConflictsPerLockConflictErrorDefault` intents.
go func() {
defer wg.Done()
txn := newTransaction(
"test", roachpb.Key("test-key"), roachpb.NormalUserPriority, tc.Server(0).Clock(),
)
for j := 0; j < storage.MaxIntentsPerLockConflictErrorDefault+10; j++ {
for j := 0; j < storage.MaxConflictsPerLockConflictErrorDefault+10; j++ {
var key roachpb.Key
key = append(key, keys.ScratchRangeMin...)
key = append(key, []byte(fmt.Sprintf("%d", j))...)
Expand All @@ -2385,10 +2385,10 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) {
}

// Now, expect a conflicting reader to encounter the intents and raise a
// LockConflictError with exactly `MaxIntentsPerLockConflictErrorDefault`
// LockConflictError with exactly `MaxConflictsPerLockConflictErrorDefault`
// intents. See the TestingConcurrencyRetryFilter above.
var ba kv.Batch
for i := 0; i < storage.MaxIntentsPerLockConflictErrorDefault+10; i += 10 {
for i := 0; i < storage.MaxConflictsPerLockConflictErrorDefault+10; i += 10 {
for _, key := range intentKeys[i : i+10] {
args := getArgs(key)
ba.AddRawRequest(&args)
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/flowinfra/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ type FlowBase struct {
// goroutines.
startedGoroutines bool

// headProcStarted tracks whether Start was called on the "head" processor
// in Run.
headProcStarted bool

// inboundStreams are streams that receive data from other hosts; this map
// is to be passed to FlowRegistry.RegisterFlow. This map is populated in
// Flow.Setup(), so it is safe to lookup into concurrently later.
Expand Down Expand Up @@ -571,6 +575,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) {
}
f.resumeCtx = ctx
log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc)
f.headProcStarted = true
headProc.Run(ctx, headOutput)
}

Expand Down Expand Up @@ -661,17 +666,26 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) {
// ConsumerClosed on the source (i.e. the "head" processor).
//
// The method is only called if:
// - the flow is local (pausable portals currently don't support DistSQL)
// - there is exactly 1 processor in the flow that runs in its own goroutine
// (which is always the case for pausable portal model at this time)
// - Start was called on that processor (ConsumerClosed is only valid to be
// called after Start)
// - that single processor implements execinfra.RowSource interface (those
// processors that don't implement it shouldn't be running through pausable
// portal model).
//
// Otherwise, this method is a noop.
func (f *FlowBase) ConsumerClosedOnHeadProc() {
if !f.IsLocal() {
return
}
if len(f.processors) != 1 {
return
}
if !f.headProcStarted {
return
}
rs, ok := f.processors[0].(execinfra.RowSource)
if !ok {
return
Expand Down
30 changes: 17 additions & 13 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1945,17 +1945,21 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error {
return errors.AssertionFailedf("unknown type for engine key %s", engineKey)
}

// Value must equal UnsafeValue.
u, err := iter.UnsafeValue()
if err != nil {
return err
}
v, err := iter.Value()
if err != nil {
return err
}
if !bytes.Equal(v, u) {
return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key)
// If the iterator position has a point key, Value must equal UnsafeValue.
// NB: It's only valid to read an iterator's Value if the iterator is
// positioned at a point key.
if hasPoint, _ := iter.HasPointAndRange(); hasPoint {
u, err := iter.UnsafeValue()
if err != nil {
return err
}
v, err := iter.Value()
if err != nil {
return err
}
if !bytes.Equal(v, u) {
return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key)
}
}

// For prefix iterators, any range keys must be point-sized. We've already
Expand Down Expand Up @@ -1994,7 +1998,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly(
ts hlc.Timestamp,
start, end roachpb.Key,
intents *[]roachpb.Intent,
maxIntents int64,
maxLockConflicts int64,
) (needIntentHistory bool, err error) {
if err := ctx.Err(); err != nil {
return false, err
Expand Down Expand Up @@ -2031,7 +2035,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly(
var meta enginepb.MVCCMetadata
var ok bool
for ok, err = iter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ok; ok, err = iter.NextEngineKey() {
if maxIntents != 0 && int64(len(*intents)) >= maxIntents {
if maxLockConflicts != 0 && int64(len(*intents)) >= maxLockConflicts {
// Return early if we're done accumulating intents; make no claims about
// not needing intent history.
return true /* needsIntentHistory */, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,7 +2264,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarly(t *testing.T) {
tc.start,
tc.end,
&intents,
0, /* maxIntents */
0, /* maxLockConflicts */
)
if tc.expErr != "" {
require.Error(t, err)
Expand Down Expand Up @@ -2486,7 +2486,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarlyReadYourOwnWrites(t *testi
keyA,
nil,
&intents,
0, /* maxIntents */
0, /* maxLockConflicts */
)
require.NoError(t, err)
if alwaysFallbackToIntentInterleavingIteratorForReadYourOwnWrites {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/metamorphic/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (m mvccDeleteRangeUsingRangeTombstoneOp) run(ctx context.Context) string {
}

err := storage.MVCCDeleteRangeUsingTombstone(ctx, writer, nil, m.key, m.endKey, m.ts,
hlc.ClockTimestamp{}, m.key, m.endKey, false /* idempotent */, math.MaxInt64, /* maxIntents */
hlc.ClockTimestamp{}, m.key, m.endKey, false /* idempotent */, math.MaxInt64, /* maxLockConflicts */
nil /* msCovered */)
if err != nil {
return fmt.Sprintf("error: %s", err)
Expand Down
Loading

0 comments on commit 0ff99f1

Please sign in to comment.