diff --git a/pkg/kv/kvserver/batcheval/cmd_clear_range.go b/pkg/kv/kvserver/batcheval/cmd_clear_range.go index 735c63dd3e0a..ee21e0465e15 100644 --- a/pkg/kv/kvserver/batcheval/cmd_clear_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_clear_range.go @@ -41,7 +41,7 @@ func declareKeysClearRange( req roachpb.Request, latchSpans, lockSpans *spanset.SpanSet, ) { - DefaultDeclareKeys(rs, header, req, latchSpans, lockSpans) + DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans) // We look up the range descriptor key to check whether the span // is equal to the entire range for fast stats updating. latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())}) @@ -74,6 +74,32 @@ func ClearRange( } } + // Check for any intents, and return them for the caller to resolve. This + // prevents removal of intents belonging to implicitly committed STAGING + // txns. Otherwise, txn recovery would fail to find these intents and + // consider the txn incomplete, uncommitting it and its writes (even those + // outside of the cleared range). + // + // We return 1000 at a time, or 1 MB. The intent resolver currently + // processes intents in batches of 100, so this gives it a few to chew on. + // + // NOTE: This only takes into account separated intents, which are currently + // not enabled by default. For interleaved intents we would have to do full + // range scans, which would be too expensive. We could mitigate this by + // relying on statistics to skip scans when no intents are known, but due + // to #60585 we are often likely to encounter intents. See discussion in: + // https://github.com/cockroachdb/cockroach/pull/61850 + var ( + maxIntents int64 = 1000 + intentBytes int64 = 1e6 + ) + intents, err := storage.ScanSeparatedIntents(readWriter, from, to, maxIntents, intentBytes) + if err != nil { + return result.Result{}, err + } else if len(intents) > 0 { + return result.Result{}, &roachpb.WriteIntentError{Intents: intents} + } + // Before clearing, compute the delta in MVCCStats. statsDelta, err := computeStatsDelta(ctx, readWriter, cArgs, from, to) if err != nil { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 2cb9dca00c33..f2c5743b1d52 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -6245,6 +6245,7 @@ func TestRangeStatsComputation(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(context.Background()) tc.Start(t, stopper) + ctx := context.Background() baseStats := initialStats() // The initial stats contain an empty lease and no prior read summary, but @@ -6311,7 +6312,7 @@ func TestRangeStatsComputation(t *testing.T) { // Account for TxnDidNotUpdateMeta expMS.LiveBytes += 2 expMS.ValBytes += 2 - if tc.engine.IsSeparatedIntentsEnabledForTesting() { + if tc.engine.IsSeparatedIntentsEnabledForTesting(ctx) { expMS.SeparatedIntentCount++ } } diff --git a/pkg/kv/kvserver/txn_recovery_integration_test.go b/pkg/kv/kvserver/txn_recovery_integration_test.go index 47b10c23fac0..b6622847c76d 100644 --- a/pkg/kv/kvserver/txn_recovery_integration_test.go +++ b/pkg/kv/kvserver/txn_recovery_integration_test.go @@ -197,16 +197,16 @@ func TestTxnRecoveryFromStaging(t *testing.T) { } } -// TestTxnClearRangeIntents tests whether a ClearRange call over a range -// containing a write intent from an implicitly committed txn can cause the -// intent to be removed such that txn recovery ends up rolling back a committed -// txn. 😱 This isn't strictly a bug, since ClearRange documents this and -// requires the caller to ensure there are no intents in the cleared range. This -// test verifies the behavior. +// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes +// write intents. This can cause it to remove an intent from an implicitly +// committed STAGING txn. When txn recovery kicks in, it will fail to find the +// expected intent, causing it to roll back a committed txn (including any +// values outside of the cleared range). // -// This is a footgun, ClearRange should make sure txn invariants are never -// violated. For ideas, see: -// https://github.com/cockroachdb/cockroach/issues/46764 +// Because the fix for this relies on separated intents, the bug will continue +// to be present until the planned migration in 21.2. Since tests currently +// enable separated intents at random, we assert the buggy behavior when these +// are disabled. See also: https://github.com/cockroachdb/cockroach/issues/46764 func TestTxnClearRangeIntents(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -268,18 +268,47 @@ func TestTxnClearRangeIntents(t *testing.T) { _, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &clearRange) require.Nil(t, pErr, "error: %s", pErr) - // Try to read A. This should have been committed, but because we cleared - // B's intent above txn recovery will fail to find an in-flight write for B - // and thus roll back the entire txn (including A) even though it has been - // implicitly committed above. 😱 - get := getArgs(keyA) - reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &get) - require.Nil(t, pErr, "error: %s", pErr) - require.Nil(t, reply.(*roachpb.GetResponse).Value, "unexpected value for key %q", keyA) + // If separated intents are enabled, all should be well. + if store.engine.IsSeparatedIntentsEnabledForTesting(ctx) { + // Reading A should succeed, but B should be gone. + get := getArgs(keyA) + reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &get) + require.Nil(t, pErr, "error: %s", pErr) + require.NotNil(t, reply.(*roachpb.GetResponse).Value, "expected value for A") + value, err := reply.(*roachpb.GetResponse).Value.GetBytes() + require.NoError(t, err) + require.Equal(t, value, valueA) - // Query the original transaction, which should now be aborted. - queryTxn := queryTxnArgs(txn.TxnMeta, false) - reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &queryTxn) - require.Nil(t, pErr, "error: %s", pErr) - require.Equal(t, roachpb.ABORTED, reply.(*roachpb.QueryTxnResponse).QueriedTxn.Status) + get = getArgs(keyB) + reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &get) + require.Nil(t, pErr, "error: %s", pErr) + require.Nil(t, reply.(*roachpb.GetResponse).Value, "unexpected value for B") + + // Query the original transaction, which should now be committed. + queryTxn := queryTxnArgs(txn.TxnMeta, false) + reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &queryTxn) + require.Nil(t, pErr, "error: %s", pErr) + require.Equal(t, roachpb.COMMITTED, reply.(*roachpb.QueryTxnResponse).QueriedTxn.Status) + + } else { + // If separated intents are disabled, ClearRange will have removed B's + // intent without resolving it. When we read A, txn recovery will expect + // to find B's intent, but when missing it assumes the txn did not + // complete and aborts it, rolling back all writes (including A). + get := getArgs(keyA) + reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &get) + require.Nil(t, pErr, "error: %s", pErr) + require.Nil(t, reply.(*roachpb.GetResponse).Value, "unexpected value for A") + + get = getArgs(keyB) + reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &get) + require.Nil(t, pErr, "error: %s", pErr) + require.Nil(t, reply.(*roachpb.GetResponse).Value, "unexpected value for B") + + // Query the original transaction, which should now be aborted. + queryTxn := queryTxnArgs(txn.TxnMeta, false) + reply, pErr = kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{}, &queryTxn) + require.Nil(t, pErr, "error: %s", pErr) + require.Equal(t, roachpb.ABORTED, reply.(*roachpb.QueryTxnResponse).QueriedTxn.Status) + } } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 2f544f83a99a..ff783da82ab3 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -11,11 +11,13 @@ package storage import ( + "bytes" "context" "fmt" "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -715,7 +717,7 @@ type Engine interface { // IsSeparatedIntentsEnabledForTesting is a test only method used in tests // that know that this enabled setting is not changing and need the value to // adjust their expectations. - IsSeparatedIntentsEnabledForTesting() bool + IsSeparatedIntentsEnabledForTesting(ctx context.Context) bool } // Batch is the interface for batch specific operations. @@ -861,6 +863,53 @@ func Scan(reader Reader, start, end roachpb.Key, max int64) ([]MVCCKeyValue, err return kvs, err } +// ScanSeparatedIntents scans intents using only the separated intents lock +// table. It does not take interleaved intents into account at all. +// +// TODO(erikgrinaker): When we are fully migrated to separated intents, this +// should be renamed ScanIntents. +func ScanSeparatedIntents( + reader Reader, start, end roachpb.Key, max int64, targetBytes int64, +) ([]roachpb.Intent, error) { + if bytes.Compare(start, end) >= 0 { + return []roachpb.Intent{}, nil + } + + ltStart, _ := keys.LockTableSingleKey(start, nil) + ltEnd, _ := keys.LockTableSingleKey(end, nil) + iter := reader.NewEngineIterator(IterOptions{LowerBound: ltStart, UpperBound: ltEnd}) + defer iter.Close() + + var ( + intents = []roachpb.Intent{} + intentBytes int64 + meta enginepb.MVCCMetadata + ) + valid, err := iter.SeekEngineKeyGE(EngineKey{Key: ltStart}) + for ; valid; valid, err = iter.NextEngineKey() { + if max != 0 && int64(len(intents)) >= max { + break + } + key, err := iter.EngineKey() + if err != nil { + return nil, err + } + lockedKey, err := keys.DecodeLockTableSingleKey(key.Key) + if err != nil { + return nil, err + } + if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { + return nil, err + } + intents = append(intents, roachpb.MakeIntent(meta.Txn, lockedKey)) + intentBytes += int64(len(lockedKey)) + int64(len(iter.Value())) + if (max > 0 && int64(len(intents)) >= max) || (targetBytes > 0 && intentBytes >= targetBytes) { + break + } + } + return intents, err +} + // WriteSyncNoop carries out a synchronous no-op write to the engine. func WriteSyncNoop(ctx context.Context, eng Engine) error { batch := eng.NewBatch() diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index b06a266b581f..0735c636a7c7 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -1589,3 +1589,67 @@ func TestFS(t *testing.T) { }) } } + +func TestScanSeparatedIntents(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + maxKey := keys.MaxKey + + keys := []roachpb.Key{ + roachpb.Key("a"), + roachpb.Key("b"), + roachpb.Key("c"), + } + testcases := map[string]struct { + from roachpb.Key + to roachpb.Key + max int64 + targetBytes int64 + expectIntents []roachpb.Key + }{ + "no keys": {keys[0], keys[0], 0, 0, keys[:0]}, + "one key": {keys[0], keys[1], 0, 0, keys[0:1]}, + "two keys": {keys[0], keys[2], 0, 0, keys[0:2]}, + "all keys": {keys[0], maxKey, 0, 0, keys}, + "offset mid": {keys[1], keys[2], 0, 0, keys[1:2]}, + "offset last": {keys[2], maxKey, 0, 0, keys[2:]}, + "offset post": {roachpb.Key("x"), maxKey, 0, 0, []roachpb.Key{}}, + "nil end": {keys[0], nil, 0, 0, []roachpb.Key{}}, + "limit keys": {keys[0], maxKey, 2, 0, keys[0:2]}, + "one byte": {keys[0], maxKey, 0, 1, keys[0:1]}, + "80 bytes": {keys[0], maxKey, 0, 80, keys[0:2]}, + "80 bytes or one": {keys[0], maxKey, 1, 80, keys[0:1]}, + "1000 bytes": {keys[0], maxKey, 0, 1000, keys}, + } + + for name, enableSeparatedIntents := range map[string]bool{"interleaved": false, "separated": true} { + t.Run(name, func(t *testing.T) { + settings := makeSettingsForSeparatedIntents(false, enableSeparatedIntents) + eng := newPebbleInMem(ctx, roachpb.Attributes{}, 1<<20, settings) + defer eng.Close() + + for _, key := range keys { + err := MVCCPut(ctx, eng, nil, key, txn1.ReadTimestamp, roachpb.Value{RawBytes: key}, txn1) + require.NoError(t, err) + } + + for name, tc := range testcases { + tc := tc + t.Run(name, func(t *testing.T) { + intents, err := ScanSeparatedIntents(eng, tc.from, tc.to, tc.max, tc.targetBytes) + require.NoError(t, err) + if enableSeparatedIntents { + require.Len(t, intents, len(tc.expectIntents), "unexpected number of separated intents") + for i, intent := range intents { + require.Equal(t, tc.expectIntents[i], intent.Key) + } + } else { + require.Empty(t, intents) + } + }) + } + }) + } +} diff --git a/pkg/storage/mvcc_stats_test.go b/pkg/storage/mvcc_stats_test.go index 2a7a31b632bd..147cacbafd25 100644 --- a/pkg/storage/mvcc_stats_test.go +++ b/pkg/storage/mvcc_stats_test.go @@ -200,7 +200,7 @@ func TestMVCCStatsPutCommitMovesTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta mValSize += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } @@ -289,7 +289,7 @@ func TestMVCCStatsPutPushMovesTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta mValSize += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } @@ -391,7 +391,7 @@ func TestMVCCStatsDeleteMovesTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta mVal1Size += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } @@ -521,7 +521,7 @@ func TestMVCCStatsPutMovesDeletionTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta mVal1Size += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } @@ -657,7 +657,7 @@ func TestMVCCStatsDelDelCommitMovesTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta mValSize += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } @@ -813,7 +813,7 @@ func TestMVCCStatsPutDelPutMovesTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta mValSize += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } @@ -1040,7 +1040,7 @@ func TestMVCCStatsPutIntentTimestampNotPutTimestamp(t *testing.T) { if accountForTxnDidNotUpdateMeta(t, engine) { // Account for TxnDidNotUpdateMeta m1ValSize += 2 - if engine.IsSeparatedIntentsEnabledForTesting() { + if engine.IsSeparatedIntentsEnabledForTesting(ctx) { separatedIntentCount = 1 } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index a85e0e1d3557..200f7be7bc8c 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -900,8 +901,9 @@ func (p *Pebble) SafeToWriteSeparatedIntents(ctx context.Context) (bool, error) } // IsSeparatedIntentsEnabledForTesting implements the Engine interface. -func (p *Pebble) IsSeparatedIntentsEnabledForTesting() bool { - return SeparatedIntentsEnabled.Get(&p.settings.SV) +func (p *Pebble) IsSeparatedIntentsEnabledForTesting(ctx context.Context) bool { + return !p.settings.Version.ActiveVersionOrEmpty(ctx).Less( + clusterversion.ByKey(clusterversion.SeparatedIntents)) && SeparatedIntentsEnabled.Get(&p.settings.SV) } func (p *Pebble) put(key MVCCKey, value []byte) error {