diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 94b435889d1e..9ea54cc31193 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -119,6 +119,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvserver", "//pkg/kv/kvserver/abortspan", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/readsummary", "//pkg/kv/kvserver/readsummary/rspb", @@ -138,8 +139,10 @@ go_test( "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/tracing", + "//pkg/util/uint128", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_kr_pretty//:pretty", diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go index fbc2818ac282..6b0aa088ea1d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go @@ -14,6 +14,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -99,57 +100,54 @@ func computeMinIntentTimestamp( maxEncounteredIntentKeyBytes int64, intentCleanupThresh hlc.Timestamp, ) (hlc.Timestamp, []roachpb.Intent, error) { - iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ - LowerBound: span.Key, - UpperBound: span.EndKey, - }) + ltStart, _ := keys.LockTableSingleKey(span.Key, nil) + ltEnd, _ := keys.LockTableSingleKey(span.EndKey, nil) + iter := reader.NewEngineIterator(storage.IterOptions{LowerBound: ltStart, UpperBound: ltEnd}) defer iter.Close() - // Iterate through all keys using NextKey. This will look at the first MVCC - // version for each key. We're only looking for MVCCMetadata versions, which - // will always be the first version of a key if it exists, so its fine that - // we skip over all other versions of keys. var meta enginepb.MVCCMetadata var minTS hlc.Timestamp var encountered []roachpb.Intent var encounteredKeyBytes int64 - for iter.SeekGE(storage.MakeMVCCMetadataKey(span.Key)); ; iter.NextKey() { - if ok, err := iter.Valid(); err != nil { + for valid, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStart}); ; valid, err = iter.NextEngineKey() { + if err != nil { return hlc.Timestamp{}, nil, err - } else if !ok { + } else if !valid { break } - - // If the key is not a metadata key, ignore it. - unsafeKey := iter.UnsafeKey() - if unsafeKey.IsValue() { + engineKey, err := iter.EngineKey() + if err != nil { continue } - - // Found a metadata key. Unmarshal. + lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) + if err != nil { + return hlc.Timestamp{}, nil, errors.Wrapf(err, "decoding LockTable key: %v", lockedKey) + } + // Unmarshal. if err := protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { - return hlc.Timestamp{}, nil, errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) + return hlc.Timestamp{}, nil, errors.Wrapf(err, "unmarshaling mvcc meta: %v", lockedKey) + } + if meta.Txn == nil { + return hlc.Timestamp{}, nil, + errors.AssertionFailedf("nil transaction in LockTable. Key: %v,"+"mvcc meta: %v", + lockedKey, meta) } - // If this is an intent, account for it. - if meta.Txn != nil { - if minTS.IsEmpty() { - minTS = meta.Txn.WriteTimestamp - } else { - minTS.Backward(meta.Txn.WriteTimestamp) - } + if minTS.IsEmpty() { + minTS = meta.Txn.WriteTimestamp + } else { + minTS.Backward(meta.Txn.WriteTimestamp) + } - // Also, add the intent to the encountered intents set if it is old enough - // and we have room, both in terms of the number of intents and the size - // of the intent keys. - oldEnough := meta.Txn.WriteTimestamp.Less(intentCleanupThresh) - intentFitsByCount := int64(len(encountered)) < maxEncounteredIntents - intentFitsByBytes := encounteredKeyBytes < maxEncounteredIntentKeyBytes - if oldEnough && intentFitsByCount && intentFitsByBytes { - key := iter.Key().Key - encountered = append(encountered, roachpb.MakeIntent(meta.Txn, key)) - encounteredKeyBytes += int64(len(key)) - } + // Also, add the intent to the encountered intents set if it is old enough + // and we have room, both in terms of the number of intents and the size + // of the intent keys. + oldEnough := meta.Txn.WriteTimestamp.Less(intentCleanupThresh) + intentFitsByCount := int64(len(encountered)) < maxEncounteredIntents + intentFitsByBytes := encounteredKeyBytes < maxEncounteredIntentKeyBytes + if oldEnough && intentFitsByCount && intentFitsByBytes { + encountered = append(encountered, roachpb.MakeIntent(meta.Txn, lockedKey)) + encounteredKeyBytes += int64(len(lockedKey)) } } return minTS, encountered, nil diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp_test.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp_test.go index 813054ac5a18..52ba28f13701 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp_test.go @@ -15,13 +15,18 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uint128" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -30,7 +35,7 @@ func TestQueryResolvedTimestamp(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - db := storage.NewDefaultInMemForTesting() + db := storage.NewInMemForTesting(true) defer db.Close() makeTS := func(ts int64) hlc.Timestamp { @@ -47,7 +52,7 @@ func TestQueryResolvedTimestamp(t *testing.T) { require.NoError(t, storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), hlc.Timestamp{}, nil)) } - // Setup: + // Setup: (with separated intents the actual key layout in the store is not what is listed below.) // // a: intent @ 5 // a: value @ 3 @@ -206,3 +211,59 @@ func TestQueryResolvedTimestamp(t *testing.T) { }) } } + +func TestQueryResolvedTimestampErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + db := storage.NewInMemForTesting(true) + defer db.Close() + + txnUUID := uuid.FromUint128(uint128.FromInts(0, 12345)) + + lockTableKey := storage.LockTableKey{ + Key: roachpb.Key("a"), + Strength: lock.Exclusive, + TxnUUID: txnUUID.GetBytes(), + } + engineKey, buf := lockTableKey.ToEngineKey(nil) + + st := cluster.MakeTestingClusterSettings() + + manual := hlc.NewManualClock(10) + clock := hlc.NewClock(manual.UnixNano, time.Nanosecond) + + evalCtx := &MockEvalCtx{ + ClusterSettings: st, + Clock: clock, + ClosedTimestamp: hlc.Timestamp{WallTime: 5}, + } + cArgs := CommandArgs{ + EvalCtx: evalCtx.EvalContext(), + Args: &roachpb.QueryResolvedTimestampRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key("a"), + EndKey: roachpb.Key("z"), + }, + }, + } + var resp roachpb.QueryResolvedTimestampResponse + t.Run("LockTable entry without MVCC metadata", func(t *testing.T) { + require.NoError(t, db.PutEngineKey(engineKey, buf)) + _, err := QueryResolvedTimestamp(ctx, db, cArgs, &resp) + require.Error(t, err) + require.Regexp(t, "unmarshaling mvcc meta", err.Error()) + }) + t.Run("LockTable entry without txn in metadata", func(t *testing.T) { + var meta enginepb.MVCCMetadata + // we're omitting meta.TxnMeta + val, err := protoutil.Marshal(&meta) + require.NoError(t, err) + require.NoError(t, db.PutEngineKey(engineKey, val)) + resp.Reset() + _, err = QueryResolvedTimestamp(ctx, db, cArgs, &resp) + require.Error(t, err) + require.Regexp(t, "nil transaction in LockTable", err.Error()) + }) +} diff --git a/pkg/storage/in_mem.go b/pkg/storage/in_mem.go index da4405508e8b..fac52a8f5461 100644 --- a/pkg/storage/in_mem.go +++ b/pkg/storage/in_mem.go @@ -42,9 +42,10 @@ func InMemFromFS(ctx context.Context, fs vfs.FS, dir string, opts ...ConfigOptio // other than configuring separated intents. So the fact that we have two // inconsistent cluster.Settings is harmless. -// NewDefaultInMemForTesting allocates and returns a new, opened in-memory engine with -// the default configuration. The caller must call the engine's Close method -// when the engine is no longer needed. +// NewDefaultInMemForTesting allocates and returns a new, opened in-memory +// engine with the default configuration. The caller must call the engine's +// Close method when the engine is no longer needed. This method randomizes +// whether separated intents are written. func NewDefaultInMemForTesting(opts ...ConfigOption) Engine { eng, err := Open(context.Background(), InMemory(), ForTesting, MaxSize(1<<20), CombineOptions(opts...)) if err != nil { @@ -52,3 +53,14 @@ func NewDefaultInMemForTesting(opts ...ConfigOption) Engine { } return eng } + +// NewInMemForTesting is just like NewDefaultInMemForTesting but allows to +// deterministically define whether it separates intents from MVCC data. +func NewInMemForTesting(enableSeparatedIntents bool, opts ...ConfigOption) Engine { + eng, err := Open(context.Background(), InMemory(), + SetSeparatedIntents(!enableSeparatedIntents), MaxSize(1<<20), CombineOptions(opts...)) + if err != nil { + panic(err) + } + return eng +}