Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
112845: storage: fix data race in MVCCPredicateDeleteRange r=msbutler a=stevendanna

This fixes two bugs in MVCCPredicateDeleteRange that would result in incorrect logical operations being published to rangefeeds.

When publishing a logical operation, the caller indicates whether the buffers provide for the Key and EndKey of the operation are safe for concurrent use by setting a `Safe` boolean field in the published operation.

MVCCPredicateDeleteRange calls either MVCCDeleteRangeUsingTombstone or mvccPutInternal, both of which publish logical opts with Safe set to true.

Unfortunatelely, the code in MVCCPredicateDeleteRange _did_ invalidate the keys passed to these functions.

As a result, it was possible for rangefeeds to observe:

1. Range keys with incorrect bounds,
2. Missed pointed deletions events,
3. Unnecessarily duplicated point deletions, and
4. Point deletions that never occurred (but were part of range deletions).

Additionally, since we expect to be able to read the values for a logical operation back out of storage, rangefeeds may also encounter errors as they failed to read back the value for an erroneous events.

Fixes #112733

Epic: none

Release note(bug fix): Fix a bug that would result in incorrect physical replication results for users of the private preview physical replication feature.

113520: kvserver: prevent infinite lock discovery hazard on replayed writes r=nvanbenschoten a=arulajmani

This patch prevents the possible infinite lock discovery hazard identified in #112409 and adds a regression test for it.

 We prevent the hazard by using the lock acquisition constructed by
`mvccPutInternal` and bubbled up in 6abea12 instead of constructing it later a few layers above. As no lock acquisition struct is constructed for replayed writes, we no longer communicate the wrong intent timestamp to the lock table.

I've verified that the test I added fails with the following diff applied:
```go
--- a/pkg/storage/mvcc.go
+++ b/pkg/storage/mvcc.go
`@@` -2338,7 +2338,7 `@@` func mvccPutInternal(
                                // transaction's write timestamp in cases where it was originally
                                // written at a lower timestamp.
                                return false,
-                                       roachpb.LockAcquisition{},
+                                       roachpb.MakeLockAcquisition(opts.Txn.TxnMeta, key, lock.Replicated, lock.Intent, opts.Txn.IgnoredSeqNums),
                                        replayTransactionalWrite(ctx, iter, meta, key, value,
                                                opts.Txn, valueFn, opts.ReplayWriteTimestampProtection,
                                        )
```

Closes #112409

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
3 people committed Nov 1, 2023
3 parents 028d1fc + c3ad4cd + c097052 commit 150623d
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 43 deletions.
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
Expand Down Expand Up @@ -67,15 +68,16 @@ func ConditionalPut(
}

var err error
var acq roachpb.LockAcquisition
if args.Blind {
_, err = storage.MVCCBlindConditionalPut(
acq, err = storage.MVCCBlindConditionalPut(
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
} else {
_, err = storage.MVCCConditionalPut(
acq, err = storage.MVCCConditionalPut(
ctx, readWriter, args.Key, ts, args.Value, args.ExpBytes, handleMissing, opts)
}
if err != nil {
return result.Result{}, err
}
return result.FromAcquiredLocks(h.Txn, args.Key), nil
return result.WithAcquiredLocks(acq), nil
}
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
)

Expand All @@ -40,7 +41,8 @@ func Delete(
}

var err error
reply.FoundKey, _, err = storage.MVCCDelete(
var acq roachpb.LockAcquisition
reply.FoundKey, acq, err = storage.MVCCDelete(
ctx, readWriter, args.Key, h.Timestamp, opts,
)
if err != nil {
Expand All @@ -56,5 +58,5 @@ func Delete(
}
}

return result.FromAcquiredLocks(h.Txn, args.Key), nil
return result.WithAcquiredLocks(acq), nil
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func DeleteRange(
// written if we're evaluating the DeleteRange for a transaction so that we
// can update the Result's AcquiredLocks field.
returnKeys := args.ReturnKeys || h.Txn != nil
deleted, resumeSpan, num, _, err := storage.MVCCDeleteRange(
deleted, resumeSpan, num, acqs, err := storage.MVCCDeleteRange(
ctx, readWriter, args.Key, args.EndKey,
h.MaxSpanRequestKeys, timestamp,
opts, returnKeys)
Expand All @@ -287,5 +287,5 @@ func DeleteRange(
}
}

return result.FromAcquiredLocks(h.Txn, deleted...), nil
return result.WithAcquiredLocks(acqs...), nil
}
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
)

Expand All @@ -41,10 +42,11 @@ func Increment(
}

var err error
reply.NewValue, _, err = storage.MVCCIncrement(
var acq roachpb.LockAcquisition
reply.NewValue, acq, err = storage.MVCCIncrement(
ctx, readWriter, args.Key, h.Timestamp, opts, args.Increment)
if err != nil {
return result.Result{}, err
}
return result.FromAcquiredLocks(h.Txn, args.Key), nil
return result.WithAcquiredLocks(acq), nil
}
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
)

Expand Down Expand Up @@ -45,15 +46,16 @@ func InitPut(
}

var err error
var acq roachpb.LockAcquisition
if args.Blind {
_, err = storage.MVCCBlindInitPut(
acq, err = storage.MVCCBlindInitPut(
ctx, readWriter, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, opts)
} else {
_, err = storage.MVCCInitPut(
acq, err = storage.MVCCInitPut(
ctx, readWriter, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, opts)
}
if err != nil {
return result.Result{}, err
}
return result.FromAcquiredLocks(h.Txn, args.Key), nil
return result.WithAcquiredLocks(acq), nil
}
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
Expand Down Expand Up @@ -63,13 +64,14 @@ func Put(
}

var err error
var acq roachpb.LockAcquisition
if args.Blind {
_, err = storage.MVCCBlindPut(ctx, readWriter, args.Key, ts, args.Value, opts)
acq, err = storage.MVCCBlindPut(ctx, readWriter, args.Key, ts, args.Value, opts)
} else {
_, err = storage.MVCCPut(ctx, readWriter, args.Key, ts, args.Value, opts)
acq, err = storage.MVCCPut(ctx, readWriter, args.Key, ts, args.Value, opts)
}
if err != nil {
return result.Result{}, err
}
return result.FromAcquiredLocks(h.Txn, args.Key), nil
return result.WithAcquiredLocks(acq), nil
}
1 change: 0 additions & 1 deletion pkg/kv/kvserver/batcheval/result/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/roachpb",
"//pkg/util/log",
Expand Down
33 changes: 20 additions & 13 deletions pkg/kv/kvserver/batcheval/result/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@

package result

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
)
import "github.com/cockroachdb/cockroach/pkg/roachpb"

// FromAcquiredLocks creates a Result communicating that the locks were
// acquired or re-acquired by the given transaction and should be handled.
func FromAcquiredLocks(txn *roachpb.Transaction, keys ...roachpb.Key) Result {
// WithAcquiredLocks creates a Result communicating that the supplied lock
// acquisitions (or re-acquisitions) were performed by the caller and they
// should be handled.
//
// If any empty lock acquisitions are supplied by the caller they will not be
// attached to the returned Result.
func WithAcquiredLocks(acqs ...roachpb.LockAcquisition) Result {
var pd Result
if txn == nil {
numAcqs := 0
for _, acq := range acqs {
if !acq.Empty() {
numAcqs++
}
}
if numAcqs == 0 { // only allocate if there is at least 1 non-empty acquisition
return pd
}
pd.Local.AcquiredLocks = make([]roachpb.LockAcquisition, len(keys))
for i := range pd.Local.AcquiredLocks {
pd.Local.AcquiredLocks[i] = roachpb.MakeLockAcquisition(
txn.TxnMeta, keys[i], lock.Replicated, lock.Intent, txn.IgnoredSeqNums,
)
pd.Local.AcquiredLocks = make([]roachpb.LockAcquisition, 0, numAcqs)
for _, acq := range acqs {
if !acq.Empty() {
pd.Local.AcquiredLocks = append(pd.Local.AcquiredLocks, acq)
}
}
return pd
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3970,6 +3970,9 @@ func (t *lockTableImpl) AcquireLock(acq *roachpb.LockAcquisition) error {
// If not enabled, don't track any locks.
return nil
}
if acq.Empty() {
return errors.AssertionFailedf("unexpected empty lock acquisition %s", acq)
}
switch acq.Strength {
case lock.Intent:
assert(acq.Durability == lock.Replicated, "incorrect durability")
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked(
}
if err != nil {
r.disconnectRangefeedWithErr(p, kvpb.NewErrorf(
"error consuming %T for key %v @ ts %v: %v", op, key, ts, err,
"error consuming %T for key %s @ ts %v: %v", op.GetValue(), roachpb.Key(key), ts, err,
))
return
}
Expand Down
101 changes: 101 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14656,3 +14656,104 @@ func TestEndTxnReplicatedLocksBumpsTSCache(t *testing.T) {
})
})
}

// TestReplayWithBumpedTimestamp serves as a regression test for the bug
// identified in https://github.com/cockroachdb/cockroach/pull/113295. It
// ensures that a replay with a bumped timestamp does not incorrectly
// communicate the timestamp at which the intent was actually written -- doing
// so can lead to infinite lock discovery cycles, as illustrated in the linked
// issue.
func TestReplayWithBumpedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
startTime := timeutil.Unix(0, 123)
tc.manualClock = timeutil.NewManualTime(startTime)
sc := TestStoreConfig(hlc.NewClockForTesting(tc.manualClock))
sc.TestingKnobs.DisableCanAckBeforeApplication = true
tc.StartWithStoreConfig(ctx, t, stopper, sc)

// We'll issue a put from txn1 at ts0.
k := roachpb.Key("a")
t0 := timeutil.Unix(1, 0)
tc.manualClock.MustAdvanceTo(t0)
txn1 := roachpb.MakeTransaction(
"t1", k, isolation.Serializable, roachpb.NormalUserPriority, makeTS(t0.UnixNano(), 0), 0, 0, 0,
)
pArgs := putArgs(k, []byte("value"))
ba := &kvpb.BatchRequest{}
ba.Txn = &txn1
ba.Add(&pArgs)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// Sanity check the number of locks in the lock table is expected; we'll then
// build on this precondition below.
if tc.repl.concMgr.LockTableMetrics().Locks != 0 {
t.Fatal("unexpected number of locks")
}

// Un-contended replicated locks are dropped by the lock table (as evidenced
// by the pre-condition above). We need the replicated lock to be tracked to
// reconstruct the hazard described in
// https://github.com/cockroachdb/cockroach/pull/113295. We do so by adding a
// non-locking waiter for this key which will discover and pull the lock into
// the lock table. Note that we do so before replaying the put at a higher
// timestamp.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

err := tc.store.DB().Txn(ctx, func(ctxt context.Context, txn *kv.Txn) error {
_, err := txn.Get(ctx, k)
return err
})
if err != nil {
t.Errorf(err.Error())
}
}()

testutils.SucceedsSoon(t, func() error {
if tc.repl.concMgr.LockTableMetrics().Locks != 1 {
return errors.New("waiting for lock to be pulled into the lock table")
}
return nil
})

// Replay the same put at a higher timestamp.
t2 := timeutil.Unix(3, 0)
txn1.WriteTimestamp = makeTS(t2.UnixNano(), 0)
_, pErr = tc.Sender().Send(ctx, ba)
require.Nil(t, pErr, "unexpected error : %v", pErr.GoError())

// Issue a read request at a timestamp t1, t0 < t1 < t2.
t1 := timeutil.Unix(2, 0)
tc.manualClock.MustAdvanceTo(t1)
// We want to ensure that txn2 doesn't end up in an infinite lock discovery
// cycle and is able to push txn1. We set txn2's priority to high to ensure it
// can successfully push txn1's timestamp instead of continuing to block
// indefinitely.
txn2 := roachpb.MakeTransaction(
"t2", k, isolation.Serializable, roachpb.MaxUserPriority, makeTS(t1.UnixNano(), 0), 0, 0, 0,
)
gArgs := getArgs(k)
ba = &kvpb.BatchRequest{}
ba.Txn = &txn2
ba.Add(&gArgs)
wg.Add(1)
go func() {
defer wg.Done()

_, pErr = tc.Sender().Send(ctx, ba)
if pErr != nil {
t.Error(pErr)
}
}()

wg.Wait()
}
21 changes: 15 additions & 6 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -3661,7 +3662,12 @@ func MVCCPredicateDeleteRange(

var runStart, runEnd roachpb.Key

buf := make([]roachpb.Key, rangeTombstoneThreshold)
// buf holds keys that we might need to issue point deletes
// for. We copy the keys using keyAlloc, truncating keyAlloc
// if we don't send the point deletes and creating a new
// keyAlloc if we do send the point deletes.
var keyAlloc bufalloc.ByteAllocator
buf := make([]roachpb.Key, 0, rangeTombstoneThreshold)

if ms == nil {
return nil, errors.AssertionFailedf(
Expand Down Expand Up @@ -3778,6 +3784,7 @@ func MVCCPredicateDeleteRange(
}
batchByteSize += int64(MVCCRangeKey{StartKey: runStart, EndKey: runEnd, Timestamp: endTime}.EncodedSize())
batchSize++
keyAlloc.Truncate()
} else {
// Use Point tombstones
for i := int64(0); i < runSize; i++ {
Expand All @@ -3794,9 +3801,13 @@ func MVCCPredicateDeleteRange(
}
batchByteSize += runByteSize
batchSize += runSize
keyAlloc = bufalloc.ByteAllocator{}
}

runSize = 0
runStart = roachpb.Key{}
runEnd = roachpb.Key{}
buf = buf[:0]
return nil
}

Expand Down Expand Up @@ -3886,11 +3897,9 @@ func MVCCPredicateDeleteRange(

if runSize < rangeTombstoneThreshold {
// Only buffer keys if there's a possibility of issuing point tombstones.
//
// To avoid unecessary memory allocation, overwrite the previous key at
// buffer's current position. No data corruption occurs because the
// buffer is flushed up to runSize.
buf[runSize] = append(buf[runSize][:0], runEnd...)
var keyCopy roachpb.Key
keyAlloc, keyCopy = keyAlloc.Copy(runEnd, 0)
buf = append(buf, keyCopy)
}

runSize++
Expand Down
Loading

0 comments on commit 150623d

Please sign in to comment.