diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index 0ff18c2290af..0e9c1a79641d 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -35,7 +35,7 @@ func declareKeysExport( desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { batcheval.DefaultDeclareKeys(desc, header, req, spans) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) } // evalExport dumps the requested keys into files of non-overlapping key ranges diff --git a/pkg/storage/batch_spanset_test.go b/pkg/storage/batch_spanset_test.go index 4066fa28ed2b..f8dcaa666531 100644 --- a/pkg/storage/batch_spanset_test.go +++ b/pkg/storage/batch_spanset_test.go @@ -27,13 +27,13 @@ import ( "github.com/pkg/errors" ) -func TestSpanSetBatch(t *testing.T) { +func TestSpanSetBatchBoundaries(t *testing.T) { defer leaktest.AfterTest(t)() eng := engine.NewDefaultInMem() defer eng.Close() var ss spanset.SpanSet - ss.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}) + ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}) outsideKey := engine.MakeMVCCMetadataKey(roachpb.Key("a")) outsideKey2 := engine.MakeMVCCMetadataKey(roachpb.Key("b")) outsideKey3 := engine.MakeMVCCMetadataKey(roachpb.Key("m")) @@ -60,7 +60,7 @@ func TestSpanSetBatch(t *testing.T) { } // Writes outside the range fail. We try to cover all write methods - // in the failure case to make sure the checkAllowed call is + // in the failure case to make sure the CheckAllowed call is // present, but we don't attempt successful versions of all // methods since those are harder to set up. isWriteSpanErr := func(err error) bool { @@ -190,6 +190,247 @@ func TestSpanSetBatch(t *testing.T) { } } +func TestSpanSetBatchTimestamps(t *testing.T) { + defer leaktest.AfterTest(t)() + eng := engine.NewDefaultInMem() + defer eng.Close() + + var ss spanset.SpanSet + ss.AddMVCC(spanset.SpanReadOnly, + roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, hlc.Timestamp{WallTime: 2}) + ss.AddMVCC(spanset.SpanReadWrite, + roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")}, hlc.Timestamp{WallTime: 2}) + + rkey := engine.MakeMVCCMetadataKey(roachpb.Key("b")) + wkey := engine.MakeMVCCMetadataKey(roachpb.Key("e")) + + value := []byte("value") + + // Write value that we can try to read later. + if err := eng.Put(rkey, value); err != nil { + t.Fatalf("direct write failed: %+v", err) + } + + batchNonMVCC := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 0}) + defer batchNonMVCC.Close() + + batchBefore := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 1}) + defer batchBefore.Close() + + batchDuring := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 2}) + defer batchDuring.Close() + + batchAfter := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 3}) + defer batchAfter.Close() + + // Writes. + if err := batchDuring.Put(wkey, value); err != nil { + t.Fatalf("failed to write inside the range at same ts as latch declaration: %+v", err) + } + + for _, batch := range []engine.Batch{batchAfter, batchBefore, batchNonMVCC} { + if err := batch.Put(wkey, value); err == nil { + t.Fatalf("was able to write inside the range at ts greater than latch declaration: %+v", err) + } + } + + // We try to cover all write methods in the failure case to make sure + // the CheckAllowedAt call is present, but we don't attempt to successful + // versions of all methods since those are harder to set up. + isWriteSpanErr := func(err error) bool { + return testutils.IsError(err, "cannot write undeclared span") + } + + for _, batch := range []engine.Batch{batchAfter, batchBefore, batchNonMVCC} { + if err := batch.Clear(wkey); !isWriteSpanErr(err) { + t.Errorf("Clear: unexpected error %v", err) + } + { + iter := batch.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax}) + err := batch.ClearIterRange(iter, wkey.Key, wkey.Key) + iter.Close() + if !isWriteSpanErr(err) { + t.Errorf("ClearIterRange: unexpected error %v", err) + } + } + if err := batch.Merge(wkey, nil); !isWriteSpanErr(err) { + t.Errorf("Merge: unexpected error %v", err) + } + if err := batch.Put(wkey, nil); !isWriteSpanErr(err) { + t.Errorf("Put: unexpected error %v", err) + } + } + + // Reads. + for _, batch := range []engine.Batch{batchBefore, batchDuring} { + //lint:ignore SA1019 historical usage of deprecated batch.Get is OK + if res, err := batch.Get(rkey); err != nil { + t.Errorf("failed to read inside the range: %+v", err) + } else if !bytes.Equal(res, value) { + t.Errorf("failed to read previously written value, got %q", res) + } + } + + isReadSpanErr := func(err error) bool { + return testutils.IsError(err, "cannot read undeclared span") + } + + for _, batch := range []engine.Batch{batchAfter, batchNonMVCC} { + //lint:ignore SA1019 historical usage of deprecated batch.Get is OK + if _, err := batch.Get(rkey); !isReadSpanErr(err) { + t.Errorf("Get: unexpected error %v", err) + } + + //lint:ignore SA1019 historical usage of deprecated batch.GetProto is OK + if _, _, _, err := batch.GetProto(rkey, nil); !isReadSpanErr(err) { + t.Errorf("GetProto: unexpected error %v", err) + } + if err := batch.Iterate(rkey.Key, rkey.Key, + func(v engine.MVCCKeyValue) (bool, error) { + return false, errors.Errorf("unexpected callback: %v", v) + }, + ); !isReadSpanErr(err) { + t.Errorf("Iterate: unexpected error %v", err) + } + } +} + +func TestSpanSetIteratorTimestamps(t *testing.T) { + defer leaktest.AfterTest(t)() + eng := engine.NewDefaultInMem() + defer eng.Close() + + var ss spanset.SpanSet + ss.AddMVCC(spanset.SpanReadOnly, roachpb.Span{ + Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, hlc.Timestamp{WallTime: 1}) + ss.AddMVCC(spanset.SpanReadOnly, roachpb.Span{ + Key: roachpb.Key("c"), EndKey: roachpb.Key("e")}, hlc.Timestamp{WallTime: 2}) + + k1, v1 := engine.MakeMVCCMetadataKey(roachpb.Key("b")), []byte("b-value") + k2, v2 := engine.MakeMVCCMetadataKey(roachpb.Key("d")), []byte("d-value") + + // Write values that we can try to read later. + if err := eng.Put(k1, v1); err != nil { + t.Fatalf("direct write failed: %+v", err) + } + if err := eng.Put(k2, v2); err != nil { + t.Fatalf("direct write failed: %+v", err) + } + + batchNonMVCC := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 0}) + defer batchNonMVCC.Close() + + batchAt1 := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 1}) + defer batchAt1.Close() + + batchAt2 := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 2}) + defer batchAt2.Close() + + batchAt3 := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 3}) + defer batchAt3.Close() + + func() { + // When accessing at t=1, we're able to read through latches declared at t=1 and t=2. + iter := batchAt1.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + iter.Seek(k1) + if ok, err := iter.Valid(); !ok { + t.Fatalf("expected valid iterator, err=%v", err) + } + if !reflect.DeepEqual(iter.Key(), k1) { + t.Fatalf("expected key %s, got %s", k1, iter.Key()) + } + + iter.Next() + if ok, err := iter.Valid(); !ok { + t.Fatalf("expected valid iterator, err=%v", err) + } + if !reflect.DeepEqual(iter.Key(), k2) { + t.Fatalf("expected key %s, got %s", k2, iter.Key()) + } + }() + + { + // When accessing at t=2, we're only able to read through the latch declared at t=2. + iter := batchAt2.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + iter.Seek(k1) + if ok, _ := iter.Valid(); ok { + t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key()) + } + + iter.Seek(k2) + if ok, err := iter.Valid(); !ok { + t.Fatalf("expected valid iterator, err=%v", err) + } + if !reflect.DeepEqual(iter.Key(), k2) { + t.Fatalf("expected key %s, got %s", k2, iter.Key()) + } + } + + for _, batch := range []engine.Batch{batchAt3, batchNonMVCC} { + // When accessing at t=3, we're unable to read through any of the declared latches. + // Same is true when accessing without a timestamp. + iter := batch.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax}) + defer iter.Close() + + iter.Seek(k1) + if ok, _ := iter.Valid(); ok { + t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key()) + } + + iter.Seek(k2) + if ok, _ := iter.Valid(); ok { + t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key()) + } + } +} + +func TestSpanSetNonMVCCBatch(t *testing.T) { + defer leaktest.AfterTest(t)() + eng := engine.NewDefaultInMem() + defer eng.Close() + + var ss spanset.SpanSet + ss.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}) + ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")}) + + rkey := engine.MakeMVCCMetadataKey(roachpb.Key("b")) + wkey := engine.MakeMVCCMetadataKey(roachpb.Key("e")) + + value := []byte("value") + + // Write value that we can try to read later. + if err := eng.Put(rkey, value); err != nil { + t.Fatalf("direct write failed: %+v", err) + } + + batchNonMVCC := spanset.NewBatch(eng.NewBatch(), &ss) + defer batchNonMVCC.Close() + + batchMVCC := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 1}) + defer batchMVCC.Close() + + // Writes. + for _, batch := range []engine.Batch{batchNonMVCC, batchMVCC} { + if err := batch.Put(wkey, value); err != nil { + t.Fatalf("write disallowed through non-MVCC latch: %+v", err) + } + } + + // Reads. + for _, batch := range []engine.Batch{batchNonMVCC, batchMVCC} { + //lint:ignore SA1019 historical usage of deprecated batch.Get is OK + if res, err := batch.Get(rkey); err != nil { + t.Errorf("read disallowed through non-MVCC latch: %+v", err) + } else if !bytes.Equal(res, value) { + t.Errorf("failed to read previously written value, got %q", res) + } + } +} + // TestSpanSetMVCCResolveWriteIntentRangeUsingIter verifies that // MVCCResolveWriteIntentRangeUsingIter does not stray outside of the passed-in // key range (which it only used to do in this corner case tested here). @@ -217,7 +458,7 @@ func TestSpanSetMVCCResolveWriteIntentRangeUsingIter(t *testing.T) { } var ss spanset.SpanSet - ss.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b\x00")}) + ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b\x00")}) batch := spanset.NewBatch(eng.NewBatch(), &ss) defer batch.Close() diff --git a/pkg/storage/batcheval/cmd_begin_transaction.go b/pkg/storage/batcheval/cmd_begin_transaction.go index 4866554d6f75..dbbe50d9f966 100644 --- a/pkg/storage/batcheval/cmd_begin_transaction.go +++ b/pkg/storage/batcheval/cmd_begin_transaction.go @@ -33,7 +33,7 @@ func declareKeysWriteTransaction( ) { if header.Txn != nil { header.Txn.AssertInitialized(context.TODO()) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: keys.TransactionKey(req.Header().Key, header.Txn.ID), }) } @@ -43,7 +43,7 @@ func declareKeysBeginTransaction( desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { declareKeysWriteTransaction(desc, header, req, spans) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID), }) } diff --git a/pkg/storage/batcheval/cmd_clear_range.go b/pkg/storage/batcheval/cmd_clear_range.go index b3a683dc4e89..e5b102f9b02e 100644 --- a/pkg/storage/batcheval/cmd_clear_range.go +++ b/pkg/storage/batcheval/cmd_clear_range.go @@ -42,7 +42,7 @@ func declareKeysClearRange( DefaultDeclareKeys(desc, header, req, spans) // We look up the range descriptor key to check whether the span // is equal to the entire range for fast stats updating. - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) } // ClearRange wipes all MVCC versions of keys covered by the specified diff --git a/pkg/storage/batcheval/cmd_delete_range.go b/pkg/storage/batcheval/cmd_delete_range.go index af1155c9ce41..02802510f6c1 100644 --- a/pkg/storage/batcheval/cmd_delete_range.go +++ b/pkg/storage/batcheval/cmd_delete_range.go @@ -13,14 +13,29 @@ package batcheval import ( "context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) func init() { - RegisterCommand(roachpb.DeleteRange, DefaultDeclareKeys, DeleteRange) + RegisterCommand(roachpb.DeleteRange, declareKeysDeleteRange, DeleteRange) +} + +func declareKeysDeleteRange( + _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + args := req.(*roachpb.DeleteRangeRequest) + access := spanset.SpanReadWrite + + if args.Inline || keys.IsLocal(req.Header().Span().Key) { + spans.AddNonMVCC(access, req.Header().Span()) + } else { + spans.AddMVCC(access, req.Header().Span(), header.Timestamp) + } } // DeleteRange deletes the range of key/value pairs specified by diff --git a/pkg/storage/batcheval/cmd_end_transaction.go b/pkg/storage/batcheval/cmd_end_transaction.go index a8d8d5bb273a..9c956dc8b0c6 100644 --- a/pkg/storage/batcheval/cmd_end_transaction.go +++ b/pkg/storage/batcheval/cmd_end_transaction.go @@ -57,7 +57,7 @@ func declareKeysEndTransaction( if !et.Commit && et.Poison { abortSpanAccess = spanset.SpanReadWrite } - spans.Add(abortSpanAccess, roachpb.Span{ + spans.AddNonMVCC(abortSpanAccess, roachpb.Span{ Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID), }) } @@ -68,13 +68,17 @@ func declareKeysEndTransaction( // All requests that intent on resolving local intents need to depend on // the range descriptor because they need to determine which intents are // within the local range. - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) // The spans may extend beyond this Range, but it's ok for the // purpose of acquiring latches. The parts in our Range will // be resolved eagerly. for _, span := range et.IntentSpans { - spans.Add(spanset.SpanReadWrite, span) + if keys.IsLocal(span.Key) { + spans.AddNonMVCC(spanset.SpanReadWrite, span) + } else { + spans.AddMVCC(spanset.SpanReadWrite, span, header.Timestamp) + } } if et.InternalCommitTrigger != nil { @@ -88,60 +92,62 @@ func declareKeysEndTransaction( // interfere with the non-delta stats computed as a part of the // split. (see // https://github.com/cockroachdb/cockroach/issues/14881) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: st.LeftDesc.StartKey.AsRawKey(), EndKey: st.RightDesc.EndKey.AsRawKey(), - }) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + }, header.Timestamp) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: keys.MakeRangeKeyPrefix(st.LeftDesc.StartKey), EndKey: keys.MakeRangeKeyPrefix(st.RightDesc.EndKey).PrefixEnd(), }) + leftRangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(header.RangeID) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: leftRangeIDPrefix, EndKey: leftRangeIDPrefix.PrefixEnd(), }) - rightRangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(st.RightDesc.RangeID) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: rightRangeIDPrefix, EndKey: rightRangeIDPrefix.PrefixEnd(), }) + rightRangeIDUnreplicatedPrefix := keys.MakeRangeIDUnreplicatedPrefix(st.RightDesc.RangeID) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: rightRangeIDUnreplicatedPrefix, EndKey: rightRangeIDUnreplicatedPrefix.PrefixEnd(), }) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: keys.RangeLastReplicaGCTimestampKey(st.LeftDesc.RangeID), }) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: keys.RangeLastReplicaGCTimestampKey(st.RightDesc.RangeID), }) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: abortspan.MinKey(header.RangeID), - EndKey: abortspan.MaxKey(header.RangeID)}) + EndKey: abortspan.MaxKey(header.RangeID), + }) } if mt := et.InternalCommitTrigger.MergeTrigger; mt != nil { // Merges write to the left side's abort span and the right side's data // and range-local spans. They also read from the right side's range ID // span. leftRangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(header.RangeID) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: leftRangeIDPrefix, EndKey: leftRangeIDPrefix.PrefixEnd(), }) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: mt.RightDesc.StartKey.AsRawKey(), EndKey: mt.RightDesc.EndKey.AsRawKey(), - }) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + }, header.Timestamp) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: keys.MakeRangeKeyPrefix(mt.RightDesc.StartKey), EndKey: keys.MakeRangeKeyPrefix(mt.RightDesc.EndKey), }) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: keys.MakeRangeIDReplicatedPrefix(mt.RightDesc.RangeID), EndKey: keys.MakeRangeIDReplicatedPrefix(mt.RightDesc.RangeID).PrefixEnd(), }) diff --git a/pkg/storage/batcheval/cmd_gc.go b/pkg/storage/batcheval/cmd_gc.go index e4bd24b34465..3bf3828332c7 100644 --- a/pkg/storage/batcheval/cmd_gc.go +++ b/pkg/storage/batcheval/cmd_gc.go @@ -33,15 +33,19 @@ func declareKeysGC( // is usually the whole range (pending resolution of #7880). gcr := req.(*roachpb.GCRequest) for _, key := range gcr.Keys { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}) + if keys.IsLocal(key.Key) { + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}) + } else { + spans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}, header.Timestamp) + } } // Be smart here about blocking on the threshold keys. The GC queue can send an empty // request first to bump the thresholds, and then another one that actually does work // but can avoid declaring these keys below. if gcr.Threshold != (hlc.Timestamp{}) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) } - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) } // GC iterates through the list of keys to garbage collect diff --git a/pkg/storage/batcheval/cmd_lease.go b/pkg/storage/batcheval/cmd_lease.go index 96861bdf1b2f..5f96518e264f 100644 --- a/pkg/storage/batcheval/cmd_lease.go +++ b/pkg/storage/batcheval/cmd_lease.go @@ -27,8 +27,8 @@ import ( func declareKeysRequestLease( desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) } func newFailedLeaseTrigger(isTransfer bool) result.Result { diff --git a/pkg/storage/batcheval/cmd_lease_info.go b/pkg/storage/batcheval/cmd_lease_info.go index 798e9caaf6e3..aaa388c205e7 100644 --- a/pkg/storage/batcheval/cmd_lease_info.go +++ b/pkg/storage/batcheval/cmd_lease_info.go @@ -27,7 +27,7 @@ func init() { func declareKeysLeaseInfo( _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) } // LeaseInfo returns information about the lease holder for the range. diff --git a/pkg/storage/batcheval/cmd_push_txn.go b/pkg/storage/batcheval/cmd_push_txn.go index e68aa4b85511..2614c59b076e 100644 --- a/pkg/storage/batcheval/cmd_push_txn.go +++ b/pkg/storage/batcheval/cmd_push_txn.go @@ -33,8 +33,8 @@ func declareKeysPushTransaction( _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { pr := req.(*roachpb.PushTxnRequest) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(pr.PusheeTxn.Key, pr.PusheeTxn.ID)}) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, pr.PusheeTxn.ID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(pr.PusheeTxn.Key, pr.PusheeTxn.ID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, pr.PusheeTxn.ID)}) } // PushTxn resolves conflicts between concurrent txns (or between diff --git a/pkg/storage/batcheval/cmd_put.go b/pkg/storage/batcheval/cmd_put.go index 125f8466c37e..7b1b6306a0b7 100644 --- a/pkg/storage/batcheval/cmd_put.go +++ b/pkg/storage/batcheval/cmd_put.go @@ -13,14 +13,29 @@ package batcheval import ( "context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) func init() { - RegisterCommand(roachpb.Put, DefaultDeclareKeys, Put) + RegisterCommand(roachpb.Put, declareKeysPut, Put) +} + +func declareKeysPut( + _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + args := req.(*roachpb.PutRequest) + access := spanset.SpanReadWrite + + if args.Inline || keys.IsLocal(req.Header().Span().Key) { + spans.AddNonMVCC(access, req.Header().Span()) + } else { + spans.AddMVCC(access, req.Header().Span(), header.Timestamp) + } } // Put sets the value for a specified key. diff --git a/pkg/storage/batcheval/cmd_query_intent.go b/pkg/storage/batcheval/cmd_query_intent.go index ba9a7a2eb8bc..de5494cf1aab 100644 --- a/pkg/storage/batcheval/cmd_query_intent.go +++ b/pkg/storage/batcheval/cmd_query_intent.go @@ -16,11 +16,21 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) func init() { - RegisterCommand(roachpb.QueryIntent, DefaultDeclareKeys, QueryIntent) + RegisterCommand(roachpb.QueryIntent, declareKeysQueryIntent, QueryIntent) +} + +func declareKeysQueryIntent( + _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + // QueryIntent requests read the specified keys at the maximum timestamp in + // order to read any intent present, if one exists, regardless of the + // timestamp it was written at. + spans.AddNonMVCC(spanset.SpanReadOnly, req.Header().Span()) } // QueryIntent checks if an intent exists for the specified transaction at the diff --git a/pkg/storage/batcheval/cmd_query_txn.go b/pkg/storage/batcheval/cmd_query_txn.go index aaf7bed2e3c2..990bb372e12f 100644 --- a/pkg/storage/batcheval/cmd_query_txn.go +++ b/pkg/storage/batcheval/cmd_query_txn.go @@ -32,7 +32,7 @@ func declareKeysQueryTransaction( _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { qr := req.(*roachpb.QueryTxnRequest) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.TransactionKey(qr.Txn.Key, qr.Txn.ID)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.TransactionKey(qr.Txn.Key, qr.Txn.ID)}) } // QueryTxn fetches the current state of a transaction. diff --git a/pkg/storage/batcheval/cmd_recompute_stats.go b/pkg/storage/batcheval/cmd_recompute_stats.go index cb0c013c578e..f15eabed7d85 100644 --- a/pkg/storage/batcheval/cmd_recompute_stats.go +++ b/pkg/storage/batcheval/cmd_recompute_stats.go @@ -46,8 +46,8 @@ func declareKeysRecomputeStats( // Note that we're also accessing the range stats key, but we don't declare it for the same // reasons as above. rdKey := keys.RangeDescriptorKey(desc.StartKey) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: rdKey}) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(rdKey, uuid.Nil)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: rdKey}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(rdKey, uuid.Nil)}) } // RecomputeStats recomputes the MVCCStats stored for this range and adjust them accordingly, diff --git a/pkg/storage/batcheval/cmd_recover_txn.go b/pkg/storage/batcheval/cmd_recover_txn.go index 2c8e1cf3162a..67141c9b6cec 100644 --- a/pkg/storage/batcheval/cmd_recover_txn.go +++ b/pkg/storage/batcheval/cmd_recover_txn.go @@ -32,8 +32,8 @@ func declareKeysRecoverTransaction( _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { rr := req.(*roachpb.RecoverTxnRequest) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(rr.Txn.Key, rr.Txn.ID)}) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, rr.Txn.ID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(rr.Txn.Key, rr.Txn.ID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, rr.Txn.ID)}) } // RecoverTxn attempts to recover the specified transaction from an diff --git a/pkg/storage/batcheval/cmd_resolve_intent.go b/pkg/storage/batcheval/cmd_resolve_intent.go index 7fc862442651..b88dbd998db6 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent.go +++ b/pkg/storage/batcheval/cmd_resolve_intent.go @@ -40,7 +40,7 @@ func declareKeysResolveIntentCombined( txnID = t.IntentTxn.ID } if WriteAbortSpanOnResolve(status) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, txnID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, txnID)}) } } diff --git a/pkg/storage/batcheval/cmd_revert_range.go b/pkg/storage/batcheval/cmd_revert_range.go index 68d826f2721c..f2f7d9970fc8 100644 --- a/pkg/storage/batcheval/cmd_revert_range.go +++ b/pkg/storage/batcheval/cmd_revert_range.go @@ -33,8 +33,8 @@ func declareKeysRevertRange( DefaultDeclareKeys(desc, header, req, spans) // We look up the range descriptor key to check whether the span // is equal to the entire range for fast stats updating. - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(desc.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(desc.RangeID)}) } // isEmptyKeyTimeRange checks if the span has no writes in (since,until]. diff --git a/pkg/storage/batcheval/cmd_subsume.go b/pkg/storage/batcheval/cmd_subsume.go index f8c9bf27563d..2bae3548ec25 100644 --- a/pkg/storage/batcheval/cmd_subsume.go +++ b/pkg/storage/batcheval/cmd_subsume.go @@ -30,8 +30,8 @@ func init() { func declareKeysSubsume( desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { - // Subsume must not run concurrently with any other command. It declares that - // it reads and writes every addressable key in the range; this guarantees + // Subsume must not run concurrently with any other command. It declares a + // non-MVCC write over every addressable key in the range; this guarantees // that it conflicts with any other command because every command must declare // at least one addressable key. It does not, in fact, write any keys. // @@ -41,16 +41,16 @@ func declareKeysSubsume( if args.RightDesc != nil { desc = args.RightDesc } - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), }) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: keys.MakeRangeKeyPrefix(desc.StartKey), EndKey: keys.MakeRangeKeyPrefix(desc.EndKey).PrefixEnd(), }) rangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(desc.RangeID) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: rangeIDPrefix, EndKey: rangeIDPrefix.PrefixEnd(), }) diff --git a/pkg/storage/batcheval/cmd_truncate_log.go b/pkg/storage/batcheval/cmd_truncate_log.go index 8a6e90b3be66..df06feacc076 100644 --- a/pkg/storage/batcheval/cmd_truncate_log.go +++ b/pkg/storage/batcheval/cmd_truncate_log.go @@ -31,9 +31,9 @@ func init() { func declareKeysTruncateLog( _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateLegacyKey(header.RangeID)}) prefix := keys.RaftLogPrefix(header.RangeID) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) } // TruncateLog discards a prefix of the raft log. Truncating part of a log that diff --git a/pkg/storage/batcheval/command.go b/pkg/storage/batcheval/command.go index 5f451f3100ab..1e8b34cba40e 100644 --- a/pkg/storage/batcheval/command.go +++ b/pkg/storage/batcheval/command.go @@ -22,7 +22,7 @@ import ( // A Command is the implementation of a single request within a BatchRequest. type Command struct { - // DeclareKeys adds all keys this command touches to the given SpanSet. + // DeclareKeys adds all keys this command touches, and when (if applicable), to the given SpanSet. // TODO(nvanbenschoten): rationalize this RangeDescriptor. Can it change // between key declaration and cmd evaluation? DeclareKeys func(*roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet) diff --git a/pkg/storage/batcheval/declare.go b/pkg/storage/batcheval/declare.go index 8e46bea7850c..7fc7f1df302e 100644 --- a/pkg/storage/batcheval/declare.go +++ b/pkg/storage/batcheval/declare.go @@ -21,12 +21,19 @@ import ( // DefaultDeclareKeys is the default implementation of Command.DeclareKeys. func DefaultDeclareKeys( - desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, + _ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { + var access spanset.SpanAccess if roachpb.IsReadOnly(req) { - spans.Add(spanset.SpanReadOnly, req.Header().Span()) + access = spanset.SpanReadOnly } else { - spans.Add(spanset.SpanReadWrite, req.Header().Span()) + access = spanset.SpanReadWrite + } + + if keys.IsLocal(req.Header().Span().Key) { + spans.AddNonMVCC(access, req.Header().Span()) + } else { + spans.AddMVCC(access, req.Header().Span(), header.Timestamp) } } @@ -38,13 +45,13 @@ func DeclareKeysForBatch( ) { if header.Txn != nil { header.Txn.AssertInitialized(context.TODO()) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID), }) } if header.ReturnRangeInfo { - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) } } diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 77a59cd53c25..2134edafffad 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -3103,7 +3103,7 @@ func TestRangeLookupAsyncResolveIntent(t *testing.T) { } } -// Verify that replicas don't temporrily disappear from the replicas map during +// Verify that replicas don't temporarily disappear from the replicas map during // the splits. See #29144. func TestStoreSplitDisappearingReplicas(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/rditer/replica_data_iter_test.go b/pkg/storage/rditer/replica_data_iter_test.go index 87473897d61b..8ceb51eb4f10 100644 --- a/pkg/storage/rditer/replica_data_iter_test.go +++ b/pkg/storage/rditer/replica_data_iter_test.go @@ -128,19 +128,19 @@ func verifyRDIter( testutils.RunTrueAndFalse(t, "spanset", func(t *testing.T, useSpanSet bool) { if useSpanSet { var spans spanset.SpanSet - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: keys.MakeRangeIDPrefix(desc.RangeID), EndKey: keys.MakeRangeIDPrefix(desc.RangeID).PrefixEnd(), }) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: keys.MakeRangeKeyPrefix(desc.StartKey), EndKey: keys.MakeRangeKeyPrefix(desc.EndKey), }) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ + spans.AddMVCC(spanset.SpanReadOnly, roachpb.Span{ Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), - }) - eng = spanset.NewReadWriter(eng, &spans) + }, hlc.Timestamp{WallTime: 42}) + eng = spanset.NewReadWriterAt(eng, &spans, hlc.Timestamp{WallTime: 42}) } iter := NewReplicaDataIterator(desc, eng, replicatedOnly) defer iter.Close() diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 07c7f6f5a562..37564dbad578 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -1207,6 +1207,13 @@ func (r *Replica) collectSpans(ba *roachpb.BatchRequest) (*spanset.SpanSet, erro spans.Reserve(spanset.SpanReadWrite, spanset.SpanGlobal, guess) } + // For non-local, MVCC spans we annotate them with the request timestamp + // during declaration. This is the timestamp used during latch acquisitions. + // For read requests this works as expected, reads are performed at the same + // timestamp. During writes however, we may encounter a versioned value newer + // than the request timestamp, and may have to retry at a higher timestamp. + // This is still safe as we're only ever writing at timestamps higher than the + // timestamp any write latch would be declared at. desc := r.Desc() batcheval.DeclareKeysForBatch(desc, ba.Header, spans) for _, union := range ba.Requests { @@ -1261,7 +1268,7 @@ func (r *Replica) beginCmds( // protected access and to avoid interacting requests from operating at // the same time. The latches will be held for the duration of request. var err error - lg, err = r.latchMgr.Acquire(ctx, spans, ba.Timestamp) + lg, err = r.latchMgr.Acquire(ctx, spans) if err != nil { return endCmds{}, err } @@ -1424,7 +1431,7 @@ func (r *Replica) executeAdminBatch( case *roachpb.ImportRequest: cArgs := batcheval.CommandArgs{ - EvalCtx: NewReplicaEvalContext(r, &spanset.SpanSet{}), + EvalCtx: NewReplicaEvalContext(r, todoSpanSet), Header: ba.Header, Args: args, } diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index fd6c008847a6..e90097ec375b 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -88,7 +88,7 @@ func (r *Replica) executeReadOnlyBatch( rec := NewReplicaEvalContext(r, spans) readOnly := r.store.Engine().NewReadOnly() if util.RaceEnabled { - readOnly = spanset.NewReadWriter(readOnly, spans) + readOnly = spanset.NewReadWriterAt(readOnly, spans, ba.Timestamp) } defer readOnly.Close() br, result, pErr = evaluateBatch(ctx, storagebase.CmdIDKey(""), readOnly, rec, nil, ba, true /* readOnly */) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 9595c3f77df3..d8aac3b43ae3 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -74,12 +74,12 @@ import ( // care about properly declaring their spans. var allSpans = func() spanset.SpanSet { var ss spanset.SpanSet - ss.Add(spanset.SpanReadWrite, roachpb.Span{ + ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: roachpb.KeyMin, EndKey: roachpb.KeyMax, }) // Local keys (see `keys.localPrefix`). - ss.Add(spanset.SpanReadWrite, roachpb.Span{ + ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: append([]byte("\x01"), roachpb.KeyMin...), EndKey: append([]byte("\x01"), roachpb.KeyMax...), }) diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index b44b66f9a40f..53db3d3cb23a 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -407,6 +407,11 @@ func (r *Replica) evaluateWriteBatchWithLocalRetries( batch = opLogger } if util.RaceEnabled { + // During writes we may encounter a versioned value newer than the request + // timestamp, and may have to retry at a higher timestamp. This is still + // safe as we're only ever writing at timestamps higher than the timestamp + // any write latch would be declared at. But because of this, we don't + // assert on access timestamps using spanset.NewBatchAt. batch = spanset.NewBatch(batch, spans) } diff --git a/pkg/storage/spanlatch/doc.go b/pkg/storage/spanlatch/doc.go index 67aac0997677..53795ddb4a04 100644 --- a/pkg/storage/spanlatch/doc.go +++ b/pkg/storage/spanlatch/doc.go @@ -10,7 +10,7 @@ /* Package spanlatch provides a latch management structure for serializing access -to keys and key ranges. Latch acquitions affecting keys or key ranges must wait +to keys and key ranges. Latch acquisitions affecting keys or key ranges must wait on already-acquired latches which overlap their key range to be released. The evolution of complexity can best be understood as a series of incremental diff --git a/pkg/storage/spanlatch/interval_btree.go b/pkg/storage/spanlatch/interval_btree.go index 63fbca0dcbca..4d8071bc7b31 100644 --- a/pkg/storage/spanlatch/interval_btree.go +++ b/pkg/storage/spanlatch/interval_btree.go @@ -968,7 +968,7 @@ func (i *iterator) Valid() bool { } // Cur returns the latch at the iterator's current position. It is illegal -// to call Latch if the iterator is not valid. +// to call Cur if the iterator is not valid. func (i *iterator) Cur() *latch { return i.n.latches[i.pos] } diff --git a/pkg/storage/spanlatch/manager.go b/pkg/storage/spanlatch/manager.go index 8939b9bcd662..a5cfc072fb1b 100644 --- a/pkg/storage/spanlatch/manager.go +++ b/pkg/storage/spanlatch/manager.go @@ -155,14 +155,8 @@ func allocGuardAndLatches(nLatches int) (*Guard, []latch) { return new(Guard), make([]latch, nLatches) } -func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { - nLatches := 0 - for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { - for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { - nLatches += len(spans.GetSpans(a, s)) - } - } - +func newGuard(spans *spanset.SpanSet) *Guard { + nLatches := spans.Len() guard, latches := allocGuardAndLatches(nLatches) for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { @@ -175,9 +169,9 @@ func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { ssLatches := latches[:n] for i := range ssLatches { latch := &latches[i] - latch.span = ss[i] - latch.ts = ifGlobal(ts, s) + latch.span = ss[i].Span latch.done = &guard.done + latch.ts = ss[i].Timestamp // latch.setID() in Manager.insert, under lock. } guard.setLatches(s, a, ssLatches) @@ -198,10 +192,8 @@ func newGuard(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { // acquired. // // It returns a Guard which must be provided to Release. -func (m *Manager) Acquire( - ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, -) (*Guard, error) { - lg, snap := m.sequence(spans, ts) +func (m *Manager) Acquire(ctx context.Context, spans *spanset.SpanSet) (*Guard, error) { + lg, snap := m.sequence(spans) defer snap.close() err := m.wait(ctx, lg, snap) @@ -216,8 +208,8 @@ func (m *Manager) Acquire( // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, ts hlc.Timestamp) (*Guard, snapshot) { - lg := newGuard(spans, ts) +func (m *Manager) sequence(spans *spanset.SpanSet) (*Guard, snapshot) { + lg := newGuard(spans) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -317,18 +309,6 @@ func ignoreLater(ts, other hlc.Timestamp) bool { return !ts.IsEmpty() && ts.Le func ignoreEarlier(ts, other hlc.Timestamp) bool { return !other.IsEmpty() && other.Less(ts) } func ignoreNothing(ts, other hlc.Timestamp) bool { return false } -func ifGlobal(ts hlc.Timestamp, s spanset.SpanScope) hlc.Timestamp { - switch s { - case spanset.SpanGlobal: - return ts - case spanset.SpanLocal: - // All local latches interfere. - return hlc.Timestamp{} - default: - panic("unknown scope") - } -} - // wait waits for all interfering latches in the provided snapshot to complete // before returning. func (m *Manager) wait(ctx context.Context, lg *Guard, snap snapshot) error { diff --git a/pkg/storage/spanlatch/manager_test.go b/pkg/storage/spanlatch/manager_test.go index 33e854534c27..530d0abbb5b5 100644 --- a/pkg/storage/spanlatch/manager_test.go +++ b/pkg/storage/spanlatch/manager_test.go @@ -32,13 +32,13 @@ var read = false var write = true var zeroTS = hlc.Timestamp{} -func spans(from, to string, write bool) *spanset.SpanSet { - var spans spanset.SpanSet - add(&spans, from, to, write) - return &spans +func spans(from, to string, write bool, ts hlc.Timestamp) *spanset.SpanSet { + var spanSet spanset.SpanSet + add(&spanSet, from, to, write, ts) + return &spanSet } -func add(spans *spanset.SpanSet, from, to string, write bool) { +func add(spanSet *spanset.SpanSet, from, to string, write bool, ts hlc.Timestamp) { var start, end roachpb.Key if to == "" { start = roachpb.Key(from) @@ -56,7 +56,12 @@ func add(spans *spanset.SpanSet, from, to string, write bool) { if write { access = spanset.SpanReadWrite } - spans.Add(access, roachpb.Span{Key: start, EndKey: end}) + + if strings.HasPrefix(from, "local") { + spanSet.AddNonMVCC(access, roachpb.Span{Key: start, EndKey: end}) + } else { + spanSet.AddMVCC(access, roachpb.Span{Key: start, EndKey: end}, ts) + } } func testLatchSucceeds(t *testing.T, lgC <-chan *Guard) *Guard { @@ -85,8 +90,8 @@ func testLatchBlocks(t *testing.T, lgC <-chan *Guard) { // MustAcquire is like Acquire, except it can't return context cancellation // errors. -func (m *Manager) MustAcquire(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { - lg, err := m.Acquire(context.Background(), spans, ts) +func (m *Manager) MustAcquire(spans *spanset.SpanSet) *Guard { + lg, err := m.Acquire(context.Background(), spans) if err != nil { panic(err) } @@ -98,16 +103,14 @@ func (m *Manager) MustAcquire(spans *spanset.SpanSet, ts hlc.Timestamp) *Guard { // returns a channel that provides the Guard when the latches are acquired (i.e. // after waiting). If the context expires, a nil Guard will be delivered on the // channel. -func (m *Manager) MustAcquireCh(spans *spanset.SpanSet, ts hlc.Timestamp) <-chan *Guard { - return m.MustAcquireChCtx(context.Background(), spans, ts) +func (m *Manager) MustAcquireCh(spans *spanset.SpanSet) <-chan *Guard { + return m.MustAcquireChCtx(context.Background(), spans) } // MustAcquireChCtx is like MustAcquireCh, except it accepts a context. -func (m *Manager) MustAcquireChCtx( - ctx context.Context, spans *spanset.SpanSet, ts hlc.Timestamp, -) <-chan *Guard { +func (m *Manager) MustAcquireChCtx(ctx context.Context, spans *spanset.SpanSet) <-chan *Guard { ch := make(chan *Guard) - lg, snap := m.sequence(spans, ts) + lg, snap := m.sequence(spans) go func() { err := m.wait(ctx, lg, snap) if err != nil { @@ -124,15 +127,15 @@ func TestLatchManager(t *testing.T) { var m Manager // Try latches with no overlapping already-acquired latches. - lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) m.Release(lg1) - lg2 := m.MustAcquire(spans("a", "b", write), zeroTS) + lg2 := m.MustAcquire(spans("a", "b", write, zeroTS)) m.Release(lg2) // Add a latch and verify overlapping latches wait on it. - lg3 := m.MustAcquire(spans("a", "b", write), zeroTS) - lg4C := m.MustAcquireCh(spans("a", "b", write), zeroTS) + lg3 := m.MustAcquire(spans("a", "b", write, zeroTS)) + lg4C := m.MustAcquireCh(spans("a", "b", write, zeroTS)) // Second write should block. testLatchBlocks(t, lg4C) @@ -156,11 +159,11 @@ func TestLatchManagerAcquireOverlappingSpans(t *testing.T) { // var ts0, ts1 = hlc.Timestamp{WallTime: 0}, hlc.Timestamp{WallTime: 1} var spanSet spanset.SpanSet - add(&spanSet, "a", "c", read) - add(&spanSet, "b", "d", write) - lg1 := m.MustAcquire(&spanSet, ts1) + add(&spanSet, "a", "c", read, ts1) + add(&spanSet, "b", "d", write, ts1) + lg1 := m.MustAcquire(&spanSet) - lg2C := m.MustAcquireCh(spans("a", "b", read), ts0) + lg2C := m.MustAcquireCh(spans("a", "b", read, ts0)) lg2 := testLatchSucceeds(t, lg2C) m.Release(lg2) @@ -168,11 +171,45 @@ func TestLatchManagerAcquireOverlappingSpans(t *testing.T) { // acquisitions based on the original latch, not the latches declared in // earlier test cases. var latchCs []<-chan *Guard - latchCs = append(latchCs, m.MustAcquireCh(spans("a", "b", write), ts1)) - latchCs = append(latchCs, m.MustAcquireCh(spans("b", "c", read), ts0)) - latchCs = append(latchCs, m.MustAcquireCh(spans("b", "c", write), ts1)) - latchCs = append(latchCs, m.MustAcquireCh(spans("c", "d", write), ts1)) - latchCs = append(latchCs, m.MustAcquireCh(spans("c", "d", read), ts0)) + latchCs = append(latchCs, m.MustAcquireCh(spans("a", "b", write, ts1))) + latchCs = append(latchCs, m.MustAcquireCh(spans("b", "c", read, ts0))) + latchCs = append(latchCs, m.MustAcquireCh(spans("b", "c", write, ts1))) + latchCs = append(latchCs, m.MustAcquireCh(spans("c", "d", write, ts1))) + latchCs = append(latchCs, m.MustAcquireCh(spans("c", "d", read, ts0))) + + for _, lgC := range latchCs { + testLatchBlocks(t, lgC) + } + + m.Release(lg1) + + for _, lgC := range latchCs { + lg := testLatchSucceeds(t, lgC) + m.Release(lg) + } +} + +func TestLatchManagerAcquiringReadsVaryingTimestamps(t *testing.T) { + defer leaktest.AfterTest(t)() + var m Manager + + var ts0, ts1 = hlc.Timestamp{WallTime: 0}, hlc.Timestamp{WallTime: 1} + var spanSet spanset.SpanSet + add(&spanSet, "a", "", read, ts0) + add(&spanSet, "a", "", read, ts1) + lg1 := m.MustAcquire(&spanSet) + + for _, walltime := range []int64{0, 1, 2} { + ts := hlc.Timestamp{WallTime: walltime} + lg := testLatchSucceeds(t, m.MustAcquireCh(spans("a", "", read, ts))) + m.Release(lg) + } + + var latchCs []<-chan *Guard + for _, walltime := range []int64{0, 1, 2} { + ts := hlc.Timestamp{WallTime: walltime} + latchCs = append(latchCs, m.MustAcquireCh(spans("a", "", write, ts))) + } for _, lgC := range latchCs { testLatchBlocks(t, lgC) @@ -191,10 +228,10 @@ func TestLatchManagerNoWaitOnReadOnly(t *testing.T) { var m Manager // Acquire latch for read-only span. - m.MustAcquire(spans("a", "", read), zeroTS) + m.MustAcquire(spans("a", "", read, zeroTS)) // Verify no wait with another read-only span. - m.MustAcquire(spans("a", "", read), zeroTS) + m.MustAcquire(spans("a", "", read, zeroTS)) } func TestLatchManagerWriteWaitForMultipleReads(t *testing.T) { @@ -202,12 +239,12 @@ func TestLatchManagerWriteWaitForMultipleReads(t *testing.T) { var m Manager // Acquire latch for read-only span. - lg1 := m.MustAcquire(spans("a", "", read), zeroTS) + lg1 := m.MustAcquire(spans("a", "", read, zeroTS)) // Acquire another one on top. - lg2 := m.MustAcquire(spans("a", "", read), zeroTS) + lg2 := m.MustAcquire(spans("a", "", read, zeroTS)) // A write span should have to wait for **both** reads. - lg3C := m.MustAcquireCh(spans("a", "", write), zeroTS) + lg3C := m.MustAcquireCh(spans("a", "", write, zeroTS)) // Certainly blocks now. testLatchBlocks(t, lg3C) @@ -230,12 +267,12 @@ func TestLatchManagerMultipleOverlappingLatches(t *testing.T) { var m Manager // Acquire multiple latches. - lg1C := m.MustAcquireCh(spans("a", "", write), zeroTS) - lg2C := m.MustAcquireCh(spans("b", "c", write), zeroTS) - lg3C := m.MustAcquireCh(spans("a", "d", write), zeroTS) + lg1C := m.MustAcquireCh(spans("a", "", write, zeroTS)) + lg2C := m.MustAcquireCh(spans("b", "c", write, zeroTS)) + lg3C := m.MustAcquireCh(spans("a", "d", write, zeroTS)) // Attempt to acquire latch which overlaps them all. - lg4C := m.MustAcquireCh(spans("0", "z", write), zeroTS) + lg4C := m.MustAcquireCh(spans("0", "z", write, zeroTS)) testLatchBlocks(t, lg4C) m.Release(<-lg1C) testLatchBlocks(t, lg4C) @@ -250,17 +287,17 @@ func TestLatchManagerMultipleOverlappingSpans(t *testing.T) { var m Manager // Acquire multiple latches. - lg1 := m.MustAcquire(spans("a", "", write), zeroTS) - lg2 := m.MustAcquire(spans("b", "c", read), zeroTS) - lg3 := m.MustAcquire(spans("d", "f", write), zeroTS) - lg4 := m.MustAcquire(spans("g", "", write), zeroTS) + lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) + lg2 := m.MustAcquire(spans("b", "c", read, zeroTS)) + lg3 := m.MustAcquire(spans("d", "f", write, zeroTS)) + lg4 := m.MustAcquire(spans("g", "", write, zeroTS)) // Attempt to acquire latches overlapping each of them. - var spanSet spanset.SpanSet - add(&spanSet, "a", "", write) - add(&spanSet, "b", "", write) - add(&spanSet, "e", "", write) - lg5C := m.MustAcquireCh(&spanSet, zeroTS) + var spans spanset.SpanSet + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a")}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("b")}) + spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("e")}) + lg5C := m.MustAcquireCh(&spans) // Blocks until the first three prerequisite latches release. testLatchBlocks(t, lg5C) @@ -280,185 +317,169 @@ func TestLatchManagerDependentLatches(t *testing.T) { cases := []struct { name string sp1 *spanset.SpanSet - ts1 hlc.Timestamp sp2 *spanset.SpanSet - ts2 hlc.Timestamp dependent bool }{ { name: "point writes, same key", - sp1: spans("a", "", write), - sp2: spans("a", "", write), + sp1: spans("a", "", write, zeroTS), + sp2: spans("a", "", write, zeroTS), dependent: true, }, { name: "point writes, different key", - sp1: spans("a", "", write), - sp2: spans("b", "", write), + sp1: spans("a", "", write, zeroTS), + sp2: spans("b", "", write, zeroTS), dependent: false, }, { name: "range writes, overlapping span", - sp1: spans("a", "c", write), - sp2: spans("b", "d", write), + sp1: spans("a", "c", write, zeroTS), + sp2: spans("b", "d", write, zeroTS), dependent: true, }, { name: "range writes, non-overlapping span", - sp1: spans("a", "b", write), - sp2: spans("b", "c", write), + sp1: spans("a", "b", write, zeroTS), + sp2: spans("b", "c", write, zeroTS), dependent: false, }, { name: "point reads, same key", - sp1: spans("a", "", read), - sp2: spans("a", "", read), + sp1: spans("a", "", read, zeroTS), + sp2: spans("a", "", read, zeroTS), dependent: false, }, { name: "point reads, different key", - sp1: spans("a", "", read), - sp2: spans("b", "", read), + sp1: spans("a", "", read, zeroTS), + sp2: spans("b", "", read, zeroTS), dependent: false, }, { name: "range reads, overlapping span", - sp1: spans("a", "c", read), - sp2: spans("b", "d", read), + sp1: spans("a", "c", read, zeroTS), + sp2: spans("b", "d", read, zeroTS), dependent: false, }, { name: "range reads, non-overlapping span", - sp1: spans("a", "b", read), - sp2: spans("b", "c", read), + sp1: spans("a", "b", read, zeroTS), + sp2: spans("b", "c", read, zeroTS), dependent: false, }, { name: "read and write, same ts", - sp1: spans("a", "", write), - ts1: hlc.Timestamp{WallTime: 1}, - sp2: spans("a", "", read), - ts2: hlc.Timestamp{WallTime: 1}, + sp1: spans("a", "", write, hlc.Timestamp{WallTime: 1}), + sp2: spans("a", "", read, hlc.Timestamp{WallTime: 1}), dependent: true, }, { name: "read and write, causal ts", - sp1: spans("a", "", write), - ts1: hlc.Timestamp{WallTime: 1}, - sp2: spans("a", "", read), - ts2: hlc.Timestamp{WallTime: 2}, + sp1: spans("a", "", write, hlc.Timestamp{WallTime: 1}), + sp2: spans("a", "", read, hlc.Timestamp{WallTime: 2}), dependent: true, }, { name: "read and write, non-causal ts", - sp1: spans("a", "", write), - ts1: hlc.Timestamp{WallTime: 2}, - sp2: spans("a", "", read), - ts2: hlc.Timestamp{WallTime: 1}, + sp1: spans("a", "", write, hlc.Timestamp{WallTime: 2}), + sp2: spans("a", "", read, hlc.Timestamp{WallTime: 1}), dependent: false, }, { name: "read and write, zero ts read", - sp1: spans("a", "", write), - ts1: hlc.Timestamp{WallTime: 1}, - sp2: spans("a", "", read), - ts2: hlc.Timestamp{WallTime: 0}, + sp1: spans("a", "", write, hlc.Timestamp{WallTime: 1}), + sp2: spans("a", "", read, hlc.Timestamp{WallTime: 0}), dependent: true, }, + { + name: "point reads, different ts", + sp1: spans("a", "", read, hlc.Timestamp{WallTime: 1}), + sp2: spans("a", "", read, hlc.Timestamp{WallTime: 0}), + dependent: false, + }, { name: "read and write, zero ts write", - sp1: spans("a", "", write), - ts1: hlc.Timestamp{WallTime: 0}, - sp2: spans("a", "", read), - ts2: hlc.Timestamp{WallTime: 1}, + sp1: spans("a", "", write, hlc.Timestamp{WallTime: 0}), + sp2: spans("a", "", read, hlc.Timestamp{WallTime: 1}), dependent: true, }, { name: "read and write, non-overlapping", - sp1: spans("a", "b", write), - sp2: spans("b", "", read), + sp1: spans("a", "b", write, zeroTS), + sp2: spans("b", "", read, zeroTS), dependent: false, }, { name: "local range writes, overlapping span", - sp1: spans("local a", "local c", write), - sp2: spans("local b", "local d", write), + sp1: spans("local a", "local c", write, zeroTS), + sp2: spans("local b", "local d", write, zeroTS), dependent: true, }, { name: "local range writes, non-overlapping span", - sp1: spans("local a", "local b", write), - sp2: spans("local b", "local c", write), + sp1: spans("local a", "local b", write, zeroTS), + sp2: spans("local b", "local c", write, zeroTS), dependent: false, }, { name: "local range reads, overlapping span", - sp1: spans("local a", "local c", read), - sp2: spans("local b", "local d", read), + sp1: spans("local a", "local c", read, zeroTS), + sp2: spans("local b", "local d", read, zeroTS), dependent: false, }, { name: "local range reads, non-overlapping span", - sp1: spans("local a", "local b", read), - sp2: spans("local b", "local c", read), + sp1: spans("local a", "local b", read, zeroTS), + sp2: spans("local b", "local c", read, zeroTS), dependent: false, }, { name: "local read and write, same ts", - sp1: spans("local a", "", write), - ts1: hlc.Timestamp{WallTime: 1}, - sp2: spans("local a", "", read), - ts2: hlc.Timestamp{WallTime: 1}, + sp1: spans("local a", "", write, hlc.Timestamp{WallTime: 1}), + sp2: spans("local a", "", read, hlc.Timestamp{WallTime: 1}), dependent: true, }, { name: "local read and write, causal ts", - sp1: spans("local a", "", write), - ts1: hlc.Timestamp{WallTime: 1}, - sp2: spans("local a", "", read), - ts2: hlc.Timestamp{WallTime: 2}, + sp1: spans("local a", "", write, hlc.Timestamp{WallTime: 1}), + sp2: spans("local a", "", read, hlc.Timestamp{WallTime: 2}), dependent: true, }, { name: "local read and write, non-causal ts", - sp1: spans("local a", "", write), - ts1: hlc.Timestamp{WallTime: 2}, - sp2: spans("local a", "", read), - ts2: hlc.Timestamp{WallTime: 1}, + sp1: spans("local a", "", write, hlc.Timestamp{WallTime: 2}), + sp2: spans("local a", "", read, hlc.Timestamp{WallTime: 1}), dependent: true, }, { name: "local read and write, zero ts read", - sp1: spans("local a", "", write), - ts1: hlc.Timestamp{WallTime: 1}, - sp2: spans("local a", "", read), - ts2: hlc.Timestamp{WallTime: 0}, + sp1: spans("local a", "", write, hlc.Timestamp{WallTime: 1}), + sp2: spans("local a", "", read, hlc.Timestamp{WallTime: 0}), dependent: true, }, { name: "local read and write, zero ts write", - sp1: spans("local a", "", write), - ts1: hlc.Timestamp{WallTime: 0}, - sp2: spans("local a", "", read), - ts2: hlc.Timestamp{WallTime: 1}, + sp1: spans("local a", "", write, hlc.Timestamp{WallTime: 0}), + sp2: spans("local a", "", read, hlc.Timestamp{WallTime: 1}), dependent: true, }, { name: "local read and write, non-overlapping", - sp1: spans("a", "b", write), - sp2: spans("b", "", read), + sp1: spans("a", "b", write, zeroTS), + sp2: spans("b", "", read, zeroTS), dependent: false, }, { name: "local read and global write, overlapping", - sp1: spans("a", "b", write), - sp2: spans("local b", "", read), + sp1: spans("a", "b", write, zeroTS), + sp2: spans("local b", "", read, zeroTS), dependent: false, }, { name: "local write and global read, overlapping", - sp1: spans("local a", "local b", write), - sp2: spans("b", "", read), + sp1: spans("local a", "local b", write, zeroTS), + sp2: spans("b", "", read, zeroTS), dependent: false, }, } @@ -468,12 +489,11 @@ func TestLatchManagerDependentLatches(t *testing.T) { c := c if inv { c.sp1, c.sp2 = c.sp2, c.sp1 - c.ts1, c.ts2 = c.ts2, c.ts1 } var m Manager - lg1 := m.MustAcquire(c.sp1, c.ts1) - lg2C := m.MustAcquireCh(c.sp2, c.ts2) + lg1 := m.MustAcquire(c.sp1) + lg2C := m.MustAcquireCh(c.sp2) if c.dependent { testLatchBlocks(t, lg2C) m.Release(lg1) @@ -494,11 +514,11 @@ func TestLatchManagerContextCancellation(t *testing.T) { var m Manager // Attempt to acquire three latches that all block on each other. - lg1 := m.MustAcquire(spans("a", "", write), zeroTS) + lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) // The second one is given a cancelable context. ctx2, cancel2 := context.WithCancel(context.Background()) - lg2C := m.MustAcquireChCtx(ctx2, spans("a", "", write), zeroTS) - lg3C := m.MustAcquireCh(spans("a", "", write), zeroTS) + lg2C := m.MustAcquireChCtx(ctx2, spans("a", "", write, zeroTS)) + lg3C := m.MustAcquireCh(spans("a", "", write, zeroTS)) // The second and third latch attempt block on the first. testLatchBlocks(t, lg2C) @@ -520,14 +540,14 @@ func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { for _, size := range []int{1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { var m Manager - ss := spans("a", "b", read) + ss := spans("a", "b", read, zeroTS) for i := 0; i < size; i++ { - _ = m.MustAcquire(ss, zeroTS) + _ = m.MustAcquire(ss) } b.ResetTimer() for i := 0; i < b.N; i++ { - _ = m.MustAcquire(ss, zeroTS) + _ = m.MustAcquire(ss) } }) } @@ -547,20 +567,17 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { if bytes.Compare(a, b) > 0 { a, b = b, a } - span := roachpb.Span{ - Key: roachpb.Key(a), - EndKey: roachpb.Key(b), - } + span := roachpb.Span{Key: a, EndKey: b} access := spanset.SpanReadOnly if i%(readsPerWrite+1) == 0 { access = spanset.SpanReadWrite } - spans[i].Add(access, span) + spans[i].AddNonMVCC(access, span) } b.ResetTimer() for i := range spans { - lg, snap := m.sequence(&spans[i], zeroTS) + lg, snap := m.sequence(&spans[i]) snap.close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) diff --git a/pkg/storage/spanset/batch.go b/pkg/storage/spanset/batch.go index 05855ff2f98d..3c6ec8a91dd1 100644 --- a/pkg/storage/spanset/batch.go +++ b/pkg/storage/spanset/batch.go @@ -24,6 +24,15 @@ type Iterator struct { i engine.Iterator spans *SpanSet + // spansOnly controls whether or not timestamps associated with the + // spans are considered when ensuring access. If set to true, + // only span boundaries are checked. + spansOnly bool + + // Timestamp the access is taking place. If timestamp is zero, access is + // considered non-MVCC. If spansOnly is set to true, ts is not consulted. + ts hlc.Timestamp + // Seeking to an invalid key puts the iterator in an error state. err error // Reaching an out-of-bounds key with Next/Prev invalidates the @@ -34,163 +43,220 @@ type Iterator struct { var _ engine.Iterator = &Iterator{} // NewIterator constructs an iterator that verifies access of the underlying -// iterator against the given spans. +// iterator against the given SpanSet. Timestamps associated with the spans +// in the spanset are not considered, only the span boundaries are checked. func NewIterator(iter engine.Iterator, spans *SpanSet) *Iterator { - return &Iterator{ - i: iter, - spans: spans, - } + return &Iterator{i: iter, spans: spans, spansOnly: true} +} + +// NewIteratorAt constructs an iterator that verifies access of the underlying +// iterator against the given SpanSet at the given timestamp. +func NewIteratorAt(iter engine.Iterator, spans *SpanSet, ts hlc.Timestamp) *Iterator { + return &Iterator{i: iter, spans: spans, ts: ts} } // Stats is part of the engine.Iterator interface. -func (s *Iterator) Stats() engine.IteratorStats { - return s.i.Stats() +func (i *Iterator) Stats() engine.IteratorStats { + return i.i.Stats() } // Close is part of the engine.Iterator interface. -func (s *Iterator) Close() { - s.i.Close() +func (i *Iterator) Close() { + i.i.Close() } // Iterator returns the underlying engine.Iterator. -func (s *Iterator) Iterator() engine.Iterator { - return s.i +func (i *Iterator) Iterator() engine.Iterator { + return i.i } // Seek is part of the engine.Iterator interface. -func (s *Iterator) Seek(key engine.MVCCKey) { - s.err = s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) - if s.err == nil { - s.invalid = false +func (i *Iterator) Seek(key engine.MVCCKey) { + if i.spansOnly { + i.err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) + } else { + i.err = i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: key.Key}, i.ts) + } + if i.err == nil { + i.invalid = false } - s.i.Seek(key) + i.i.Seek(key) } // SeekReverse is part of the engine.Iterator interface. -func (s *Iterator) SeekReverse(key engine.MVCCKey) { - s.err = s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) - if s.err == nil { - s.invalid = false +func (i *Iterator) SeekReverse(key engine.MVCCKey) { + if i.spansOnly { + i.err = i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) + } else { + i.err = i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: key.Key}, i.ts) } - s.i.SeekReverse(key) + if i.err == nil { + i.invalid = false + } + i.i.SeekReverse(key) } // Valid is part of the engine.Iterator interface. -func (s *Iterator) Valid() (bool, error) { - if s.err != nil { - return false, s.err +func (i *Iterator) Valid() (bool, error) { + if i.err != nil { + return false, i.err } - ok, err := s.i.Valid() + ok, err := i.i.Valid() if err != nil { - return false, s.err + return false, i.err } - return ok && !s.invalid, nil + return ok && !i.invalid, nil } // Next is part of the engine.Iterator interface. -func (s *Iterator) Next() { - s.i.Next() - if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { - s.invalid = true +func (i *Iterator) Next() { + i.i.Next() + if i.spansOnly { + if i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: i.UnsafeKey().Key}) != nil { + i.invalid = true + } + } else { + if i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: i.UnsafeKey().Key}, i.ts) != nil { + i.invalid = true + } } } // Prev is part of the engine.Iterator interface. -func (s *Iterator) Prev() { - s.i.Prev() - if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { - s.invalid = true +func (i *Iterator) Prev() { + i.i.Prev() + if i.spansOnly { + if i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: i.UnsafeKey().Key}) != nil { + i.invalid = true + } + } else { + if i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: i.UnsafeKey().Key}, i.ts) != nil { + i.invalid = true + } } } // NextKey is part of the engine.Iterator interface. -func (s *Iterator) NextKey() { - s.i.NextKey() - if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { - s.invalid = true +func (i *Iterator) NextKey() { + i.i.NextKey() + if i.spansOnly { + if i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: i.UnsafeKey().Key}) != nil { + i.invalid = true + } + } else { + if i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: i.UnsafeKey().Key}, i.ts) != nil { + i.invalid = true + } } } // Key is part of the engine.Iterator interface. -func (s *Iterator) Key() engine.MVCCKey { - return s.i.Key() +func (i *Iterator) Key() engine.MVCCKey { + return i.i.Key() } // Value is part of the engine.Iterator interface. -func (s *Iterator) Value() []byte { - return s.i.Value() +func (i *Iterator) Value() []byte { + return i.i.Value() } // ValueProto is part of the engine.Iterator interface. -func (s *Iterator) ValueProto(msg protoutil.Message) error { - return s.i.ValueProto(msg) +func (i *Iterator) ValueProto(msg protoutil.Message) error { + return i.i.ValueProto(msg) } // UnsafeKey is part of the engine.Iterator interface. -func (s *Iterator) UnsafeKey() engine.MVCCKey { - return s.i.UnsafeKey() +func (i *Iterator) UnsafeKey() engine.MVCCKey { + return i.i.UnsafeKey() } // UnsafeValue is part of the engine.Iterator interface. -func (s *Iterator) UnsafeValue() []byte { - return s.i.UnsafeValue() +func (i *Iterator) UnsafeValue() []byte { + return i.i.UnsafeValue() } // ComputeStats is part of the engine.Iterator interface. -func (s *Iterator) ComputeStats( +func (i *Iterator) ComputeStats( start, end roachpb.Key, nowNanos int64, ) (enginepb.MVCCStats, error) { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { - return enginepb.MVCCStats{}, err + if i.spansOnly { + if err := i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { + return enginepb.MVCCStats{}, err + } + } else { + if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, i.ts); err != nil { + return enginepb.MVCCStats{}, err + } } - return s.i.ComputeStats(start, end, nowNanos) + return i.i.ComputeStats(start, end, nowNanos) } // FindSplitKey is part of the engine.Iterator interface. -func (s *Iterator) FindSplitKey( +func (i *Iterator) FindSplitKey( start, end, minSplitKey roachpb.Key, targetSize int64, ) (engine.MVCCKey, error) { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { - return engine.MVCCKey{}, err + if i.spansOnly { + if err := i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { + return engine.MVCCKey{}, err + } + } else { + if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, i.ts); err != nil { + return engine.MVCCKey{}, err + } } - return s.i.FindSplitKey(start, end, minSplitKey, targetSize) + return i.i.FindSplitKey(start, end, minSplitKey, targetSize) } // CheckForKeyCollisions is part of the engine.Iterator interface. -func (s *Iterator) CheckForKeyCollisions( +func (i *Iterator) CheckForKeyCollisions( sstData []byte, start, end roachpb.Key, ) (enginepb.MVCCStats, error) { - return s.i.CheckForKeyCollisions(sstData, start, end) + return i.i.CheckForKeyCollisions(sstData, start, end) } // MVCCGet is part of the engine.Iterator interface. -func (s *Iterator) MVCCGet( +func (i *Iterator) MVCCGet( key roachpb.Key, timestamp hlc.Timestamp, opts engine.MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key}); err != nil { - return nil, nil, err + if i.spansOnly { + if err := i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key}); err != nil { + return nil, nil, err + } + } else { + if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: key}, timestamp); err != nil { + return nil, nil, err + } } - return s.i.MVCCGet(key, timestamp, opts) + return i.i.MVCCGet(key, timestamp, opts) } // MVCCScan is part of the engine.Iterator interface. -func (s *Iterator) MVCCScan( +func (i *Iterator) MVCCScan( start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts engine.MVCCScanOptions, ) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { - return nil, 0, nil, nil, err + if i.spansOnly { + if err := i.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { + return nil, 0, nil, nil, err + } + } else { + if err := i.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, timestamp); err != nil { + return nil, 0, nil, nil, err + } } - return s.i.MVCCScan(start, end, max, timestamp, opts) + return i.i.MVCCScan(start, end, max, timestamp, opts) } // SetUpperBound is part of the engine.Iterator interface. -func (s *Iterator) SetUpperBound(key roachpb.Key) { - s.i.SetUpperBound(key) +func (i *Iterator) SetUpperBound(key roachpb.Key) { + i.i.SetUpperBound(key) } type spanSetReader struct { r engine.Reader spans *SpanSet + + spansOnly bool + ts hlc.Timestamp } var _ engine.Reader = spanSetReader{} @@ -204,8 +270,14 @@ func (s spanSetReader) Closed() bool { } func (s spanSetReader) Get(key engine.MVCCKey) ([]byte, error) { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { - return nil, err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { + return nil, err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: key.Key}, s.ts); err != nil { + return nil, err + } } //lint:ignore SA1019 implementing deprecated interface function (Get) is OK return s.r.Get(key) @@ -214,8 +286,14 @@ func (s spanSetReader) Get(key engine.MVCCKey) ([]byte, error) { func (s spanSetReader) GetProto( key engine.MVCCKey, msg protoutil.Message, ) (bool, int64, int64, error) { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { - return false, 0, 0, err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { + return false, 0, 0, err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: key.Key}, s.ts); err != nil { + return false, 0, 0, err + } } //lint:ignore SA1019 implementing deprecated interface function (GetProto) is OK return s.r.GetProto(key, msg) @@ -224,14 +302,23 @@ func (s spanSetReader) GetProto( func (s spanSetReader) Iterate( start, end roachpb.Key, f func(engine.MVCCKeyValue) (bool, error), ) error { - if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadOnly, roachpb.Span{Key: start, EndKey: end}, s.ts); err != nil { + return err + } } return s.r.Iterate(start, end, f) } func (s spanSetReader) NewIterator(opts engine.IterOptions) engine.Iterator { - return &Iterator{s.r.NewIterator(opts), s.spans, nil, false} + if s.spansOnly { + return NewIterator(s.r.NewIterator(opts), s.spans) + } + return NewIteratorAt(s.r.NewIterator(opts), s.spans, s.ts) } // GetDBEngine recursively searches for the underlying rocksDB engine. @@ -259,6 +346,9 @@ func getSpanReader(r ReadWriter, span roachpb.Span) engine.Reader { type spanSetWriter struct { w engine.Writer spans *SpanSet + + spansOnly bool + ts hlc.Timestamp } var _ engine.Writer = spanSetWriter{} @@ -269,43 +359,79 @@ func (s spanSetWriter) ApplyBatchRepr(repr []byte, sync bool) error { } func (s spanSetWriter) Clear(key engine.MVCCKey) error { - if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key.Key}, s.ts); err != nil { + return err + } } return s.w.Clear(key) } func (s spanSetWriter) SingleClear(key engine.MVCCKey) error { - if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key.Key}, s.ts); err != nil { + return err + } } return s.w.SingleClear(key) } func (s spanSetWriter) ClearRange(start, end engine.MVCCKey) error { - if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}, s.ts); err != nil { + return err + } } return s.w.ClearRange(start, end) } func (s spanSetWriter) ClearIterRange(iter engine.Iterator, start, end roachpb.Key) error { - if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: start, EndKey: end}, s.ts); err != nil { + return err + } } return s.w.ClearIterRange(iter, start, end) } func (s spanSetWriter) Merge(key engine.MVCCKey, value []byte) error { - if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key.Key}, s.ts); err != nil { + return err + } } return s.w.Merge(key, value) } func (s spanSetWriter) Put(key engine.MVCCKey, value []byte) error { - if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { - return err + if s.spansOnly { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + return err + } + } else { + if err := s.spans.CheckAllowedAt(SpanReadWrite, roachpb.Span{Key: key.Key}, s.ts); err != nil { + return err + } } return s.w.Put(key, value) } @@ -330,14 +456,15 @@ var _ engine.ReadWriter = ReadWriter{} func makeSpanSetReadWriter(rw engine.ReadWriter, spans *SpanSet) ReadWriter { return ReadWriter{ - spanSetReader{ - r: rw, - spans: spans, - }, - spanSetWriter{ - w: rw, - spans: spans, - }, + spanSetReader: spanSetReader{r: rw, spans: spans, spansOnly: true}, + spanSetWriter: spanSetWriter{w: rw, spans: spans, spansOnly: true}, + } +} + +func makeSpanSetReadWriterAt(rw engine.ReadWriter, spans *SpanSet, ts hlc.Timestamp) ReadWriter { + return ReadWriter{ + spanSetReader: spanSetReader{r: rw, spans: spans, ts: ts}, + spanSetWriter: spanSetWriter{w: rw, spans: spans, ts: ts}, } } @@ -347,10 +474,20 @@ func NewReadWriter(rw engine.ReadWriter, spans *SpanSet) engine.ReadWriter { return makeSpanSetReadWriter(rw, spans) } +// NewReadWriterAt returns an engine.ReadWriter that asserts access of the +// underlying ReadWriter against the given SpanSet at a given timestamp. +// If zero timestamp is provided, accesses are considered non-MVCC. +func NewReadWriterAt(rw engine.ReadWriter, spans *SpanSet, ts hlc.Timestamp) engine.ReadWriter { + return makeSpanSetReadWriterAt(rw, spans, ts) +} + type spanSetBatch struct { ReadWriter b engine.Batch spans *SpanSet + + spansOnly bool + ts hlc.Timestamp } var _ engine.Batch = spanSetBatch{} @@ -360,7 +497,10 @@ func (s spanSetBatch) Commit(sync bool) error { } func (s spanSetBatch) Distinct() engine.ReadWriter { - return makeSpanSetReadWriter(s.b.Distinct(), s.spans) + if s.spansOnly { + return NewReadWriter(s.b.Distinct(), s.spans) + } + return NewReadWriterAt(s.b.Distinct(), s.spans, s.ts) } func (s spanSetBatch) Empty() bool { @@ -376,11 +516,25 @@ func (s spanSetBatch) Repr() []byte { } // NewBatch returns an engine.Batch that asserts access of the underlying -// Batch against the given SpanSet. +// Batch against the given SpanSet. We only consider span boundaries, associated +// timestamps are not considered. func NewBatch(b engine.Batch, spans *SpanSet) engine.Batch { return &spanSetBatch{ - makeSpanSetReadWriter(b, spans), - b, - spans, + ReadWriter: makeSpanSetReadWriter(b, spans), + b: b, + spans: spans, + spansOnly: true, + } +} + +// NewBatchAt returns an engine.Batch that asserts access of the underlying +// Batch against the given SpanSet at the given timestamp. +// If the zero timestamp is used, all accesses are considered non-MVCC. +func NewBatchAt(b engine.Batch, spans *SpanSet, ts hlc.Timestamp) engine.Batch { + return &spanSetBatch{ + ReadWriter: makeSpanSetReadWriterAt(b, spans, ts), + b: b, + spans: spans, + ts: ts, } } diff --git a/pkg/storage/spanset/merge.go b/pkg/storage/spanset/merge.go new file mode 100644 index 000000000000..b8fa4aa88795 --- /dev/null +++ b/pkg/storage/spanset/merge.go @@ -0,0 +1,123 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanset + +import "sort" + +type sortedSpans []Span + +func (s sortedSpans) Less(i, j int) bool { + // Sort first on the start key and second on the end key. Note that we're + // relying on EndKey = nil (and len(EndKey) == 0) sorting before other + // EndKeys. + c := s[i].Key.Compare(s[j].Key) + if c != 0 { + return c < 0 + } + return s[i].EndKey.Compare(s[j].EndKey) < 0 +} + +func (s sortedSpans) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s sortedSpans) Len() int { + return len(s) +} + +// mergeSpans sorts the given spans and merges ones with overlapping +// spans and equal access timestamps. The implementation is a copy of +// roachpb.MergeSpans. +// +// Returns true iff all of the spans are distinct. +// The input spans are not safe for re-use. +func mergeSpans(latches []Span) ([]Span, bool) { + if len(latches) == 0 { + return latches, true + } + + sort.Sort(sortedSpans(latches)) + + // We build up the resulting slice of merged spans in place. This is safe + // because "r" grows by at most 1 element on each iteration, staying abreast + // or behind the iteration over "latches". + r := latches[:1] + distinct := true + + for _, cur := range latches[1:] { + prev := &r[len(r)-1] + if len(cur.EndKey) == 0 && len(prev.EndKey) == 0 { + if cur.Key.Compare(prev.Key) != 0 { + // [a, nil] merge [b, nil] + r = append(r, cur) + } else { + // [a, nil] merge [a, nil] + if cur.Timestamp != prev.Timestamp { + r = append(r, cur) + } + distinct = false + } + continue + } + if len(prev.EndKey) == 0 { + if cur.Key.Compare(prev.Key) == 0 { + // [a, nil] merge [a, b] + if cur.Timestamp != prev.Timestamp { + r = append(r, cur) + } else { + prev.EndKey = cur.EndKey + } + distinct = false + } else { + // [a, nil] merge [b, c] + r = append(r, cur) + } + continue + } + + if c := prev.EndKey.Compare(cur.Key); c >= 0 { + if cur.EndKey != nil { + if prev.EndKey.Compare(cur.EndKey) < 0 { + // [a, c] merge [b, d] + if cur.Timestamp != prev.Timestamp { + r = append(r, cur) + } else { + prev.EndKey = cur.EndKey + } + if c > 0 { + distinct = false + } + } else { + // [a, c] merge [b, c] + if cur.Timestamp != prev.Timestamp { + r = append(r, cur) + } + distinct = false + } + } else if c == 0 { + // [a, b] merge [b, nil] + if cur.Timestamp != prev.Timestamp { + r = append(r, cur) + } + prev.EndKey = cur.Key.Next() + } else { + // [a, c] merge [b, nil] + if cur.Timestamp != prev.Timestamp { + r = append(r, cur) + } + distinct = false + } + continue + } + r = append(r, cur) + } + return r, distinct +} diff --git a/pkg/storage/spanset/spanset.go b/pkg/storage/spanset/spanset.go index b78b5f1bfba7..edc223d35fe3 100644 --- a/pkg/storage/spanset/spanset.go +++ b/pkg/storage/spanset/spanset.go @@ -17,11 +17,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) -// SpanAccess records the intended mode of access in SpanSet. +// SpanAccess records the intended mode of access in a SpanSet. type SpanAccess int // Constants for SpanAccess. Higher-valued accesses imply lower-level ones. @@ -65,21 +66,29 @@ func (a SpanScope) String() string { } } -// SpanSet tracks the set of key spans touched by a command. The set -// is divided into subsets for access type (read-only or read/write) -// and key scope (local or global; used to facilitate use by the -// separate local and global latches). +// Span is used to represent a keyspan accessed by a request at a given +// timestamp. A zero timestamp indicates it's a non-MVCC access. +type Span struct { + roachpb.Span + Timestamp hlc.Timestamp +} + +// SpanSet tracks the set of key spans touched by a command, broken into MVCC +// and non-MVCC accesses. The set is divided into subsets for access type +// (read-only or read/write) and key scope (local or global; used to facilitate +// use by the separate local and global latches). type SpanSet struct { - spans [NumSpanAccess][NumSpanScope][]roachpb.Span + spans [NumSpanAccess][NumSpanScope][]Span } -// String prints a string representation of the span set. -func (ss *SpanSet) String() string { +// String prints a string representation of the SpanSet. +func (s *SpanSet) String() string { var buf strings.Builder - for i := SpanAccess(0); i < NumSpanAccess; i++ { - for j := SpanScope(0); j < NumSpanScope; j++ { - for _, span := range ss.GetSpans(i, j) { - fmt.Fprintf(&buf, "%s %s: %s\n", i, j, span) + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + for _, cur := range s.GetSpans(sa, ss) { + fmt.Fprintf(&buf, "%s %s: %s at %s\n", + sa, ss, cur.Span.String(), cur.Timestamp.String()) } } } @@ -87,69 +96,81 @@ func (ss *SpanSet) String() string { } // Len returns the total number of spans tracked across all accesses and scopes. -func (ss *SpanSet) Len() int { +func (s *SpanSet) Len() int { var count int - for i := SpanAccess(0); i < NumSpanAccess; i++ { - for j := SpanScope(0); j < NumSpanScope; j++ { - count += len(ss.GetSpans(i, j)) + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + count += len(s.GetSpans(sa, ss)) } } return count } -// Reserve space for N additional keys. -func (ss *SpanSet) Reserve(access SpanAccess, scope SpanScope, n int) { - existing := ss.spans[access][scope] - ss.spans[access][scope] = make([]roachpb.Span, len(existing), n+cap(existing)) - copy(ss.spans[access][scope], existing) +// Reserve space for N additional spans. +func (s *SpanSet) Reserve(access SpanAccess, scope SpanScope, n int) { + existing := s.spans[access][scope] + s.spans[access][scope] = make([]Span, len(existing), n+cap(existing)) + copy(s.spans[access][scope], existing) +} + +// AddNonMVCC adds a non-MVCC span to the span set. This should typically +// local keys. +func (s *SpanSet) AddNonMVCC(access SpanAccess, span roachpb.Span) { + s.AddMVCC(access, span, hlc.Timestamp{}) } -// Add a span to the set. -func (ss *SpanSet) Add(access SpanAccess, span roachpb.Span) { +// AddMVCC adds an MVCC span to the span set to be accessed at the given +// timestamp. This should typically be used for MVCC keys, user keys for e.g. +func (s *SpanSet) AddMVCC(access SpanAccess, span roachpb.Span, timestamp hlc.Timestamp) { scope := SpanGlobal if keys.IsLocal(span.Key) { scope = SpanLocal } - ss.spans[access][scope] = append(ss.spans[access][scope], span) + + s.spans[access][scope] = append(s.spans[access][scope], Span{Span: span, Timestamp: timestamp}) } // SortAndDedup sorts the spans in the SpanSet and removes any duplicates. -func (ss *SpanSet) SortAndDedup() { - for i := SpanAccess(0); i < NumSpanAccess; i++ { - for j := SpanScope(0); j < NumSpanScope; j++ { - ss.spans[i][j], _ /* distinct */ = roachpb.MergeSpans(ss.spans[i][j]) +func (s *SpanSet) SortAndDedup() { + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + s.spans[sa][ss], _ /* distinct */ = mergeSpans(s.spans[sa][ss]) } } } // GetSpans returns a slice of spans with the given parameters. -func (ss *SpanSet) GetSpans(access SpanAccess, scope SpanScope) []roachpb.Span { - return ss.spans[access][scope] +func (s *SpanSet) GetSpans(access SpanAccess, scope SpanScope) []Span { + return s.spans[access][scope] } // BoundarySpan returns a span containing all the spans with the given params. -func (ss *SpanSet) BoundarySpan(scope SpanScope) roachpb.Span { +func (s *SpanSet) BoundarySpan(scope SpanScope) roachpb.Span { var boundary roachpb.Span - for i := SpanAccess(0); i < NumSpanAccess; i++ { - for _, span := range ss.spans[i][scope] { + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for _, cur := range s.GetSpans(sa, scope) { if !boundary.Valid() { - boundary = span + boundary = cur.Span continue } - boundary = boundary.Combine(span) + boundary = boundary.Combine(cur.Span) } } return boundary } -// AssertAllowed calls checkAllowed and fatals if the access is not allowed. -func (ss *SpanSet) AssertAllowed(access SpanAccess, span roachpb.Span) { - if err := ss.CheckAllowed(access, span); err != nil { +// AssertAllowed calls CheckAllowed and fatals if the access is not allowed. +// Timestamps associated with the spans in the spanset are not considered, +// only the span boundaries are checked. +func (s *SpanSet) AssertAllowed(access SpanAccess, span roachpb.Span) { + if err := s.CheckAllowed(access, span); err != nil { log.Fatal(context.TODO(), err) } } -// CheckAllowed returns an error if the access is not allowed. +// CheckAllowed returns an error if the access is not allowed over the given +// keyspan. Timestamps associated with the spans in the spanset are not +// considered, only the span boundaries are checked. // // TODO(irfansharif): This does not currently work for spans that straddle // across multiple added spans. Specifically a spanset with spans [a-c) and @@ -157,33 +178,76 @@ func (ss *SpanSet) AssertAllowed(access SpanAccess, span roachpb.Span) { // fail at checking if read only access over the span [a-d) was requested. This // is also a problem if the added spans were read only and the spanset wasn't // already SortAndDedup-ed. -func (ss *SpanSet) CheckAllowed(access SpanAccess, span roachpb.Span) error { +func (s *SpanSet) CheckAllowed(access SpanAccess, span roachpb.Span) error { scope := SpanGlobal if keys.IsLocal(span.Key) { scope = SpanLocal } + for ac := access; ac < NumSpanAccess; ac++ { - for _, s := range ss.spans[ac][scope] { - if s.Contains(span) { + for _, cur := range s.spans[ac][scope] { + if cur.Contains(span) { return nil } } } - return errors.Errorf("cannot %s undeclared span %s\ndeclared:\n%s", access, span, ss) + return errors.Errorf("cannot %s undeclared span %s\ndeclared:\n%s", access, span, s) +} + +// CheckAllowedAt returns an error if the access is not allowed at over the given keyspan +// at the given timestamp. +func (s *SpanSet) CheckAllowedAt( + access SpanAccess, span roachpb.Span, timestamp hlc.Timestamp, +) error { + scope := SpanGlobal + if keys.IsLocal(span.Key) { + scope = SpanLocal + } + + for ac := access; ac < NumSpanAccess; ac++ { + for _, cur := range s.spans[ac][scope] { + if cur.Contains(span) { + if cur.Timestamp.IsEmpty() { + // When the span is acquired as non-MVCC (i.e. with an empty + // timestamp), it's equivalent to a read/write mutex where we don't + // consider access timestamps. + return nil + } + + if access == SpanReadWrite { + // Writes under a write span with an associated timestamp at that + // specific timestamp. + if timestamp == cur.Timestamp { + return nil + } + } else { + // Read spans acquired at a specific timestamp only allow reads at + // that timestamp and below. Non-MVCC access is not allowed. + if !timestamp.IsEmpty() && (timestamp.Less(cur.Timestamp) || timestamp == cur.Timestamp) { + return nil + } + } + } + } + } + + return errors.Errorf("cannot %s undeclared span %s at %s\ndeclared:\n%s", + access, span, timestamp.String(), s) } // Validate returns an error if any spans that have been added to the set // are invalid. -func (ss *SpanSet) Validate() error { - for _, accessSpans := range ss.spans { - for _, spans := range accessSpans { - for _, span := range spans { - if len(span.EndKey) > 0 && span.Key.Compare(span.EndKey) >= 0 { - return errors.Errorf("inverted span %s %s", span.Key, span.EndKey) +func (s *SpanSet) Validate() error { + for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { + for ss := SpanScope(0); ss < NumSpanScope; ss++ { + for _, cur := range s.GetSpans(sa, ss) { + if len(cur.EndKey) > 0 && cur.Key.Compare(cur.EndKey) >= 0 { + return errors.Errorf("inverted span %s %s", cur.Key, cur.EndKey) } } } } + return nil } diff --git a/pkg/storage/spanset/spanset_test.go b/pkg/storage/spanset/spanset_test.go index d6d4286e591c..a986fd596ee5 100644 --- a/pkg/storage/spanset/spanset_test.go +++ b/pkg/storage/spanset/spanset_test.go @@ -16,6 +16,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) @@ -25,18 +27,20 @@ func TestSpanSetGetSpansScope(t *testing.T) { defer leaktest.AfterTest(t)() var ss SpanSet - ss.Add(SpanReadOnly, roachpb.Span{Key: roachpb.Key("a")}) - ss.Add(SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(1)}) - ss.Add(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}) + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("a")}) + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(1)}) + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}) - exp := []roachpb.Span{{Key: keys.RangeLastGCKey(1)}} + exp := []Span{ + {Span: roachpb.Span{Key: keys.RangeLastGCKey(1)}}, + } if act := ss.GetSpans(SpanReadOnly, SpanLocal); !reflect.DeepEqual(act, exp) { t.Errorf("get local spans: got %v, expected %v", act, exp) } - exp = []roachpb.Span{ - {Key: roachpb.Key("a")}, - {Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}, + exp = []Span{ + {Span: roachpb.Span{Key: roachpb.Key("a")}}, + {Span: roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}}, } if act := ss.GetSpans(SpanReadOnly, SpanGlobal); !reflect.DeepEqual(act, exp) { @@ -44,14 +48,14 @@ func TestSpanSetGetSpansScope(t *testing.T) { } } -// Test that CheckAllowed properly enforces boundaries. +// Test that CheckAllowed properly enforces span boundaries. func TestSpanSetCheckAllowedBoundaries(t *testing.T) { defer leaktest.AfterTest(t)() var ss SpanSet - ss.Add(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}) - ss.Add(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}) - ss.Add(SpanReadOnly, roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("q")}) + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}) + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}) + ss.AddNonMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("k"), EndKey: roachpb.Key("q")}) allowed := []roachpb.Span{ // Exactly as declared. @@ -104,6 +108,116 @@ func TestSpanSetCheckAllowedBoundaries(t *testing.T) { } } +// Test that CheckAllowedAt properly enforces timestamp control. +func TestSpanSetCheckAllowedAtTimestamps(t *testing.T) { + defer leaktest.AfterTest(t)() + + var ss SpanSet + ss.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, hlc.Timestamp{WallTime: 2}) + ss.AddMVCC(SpanReadOnly, roachpb.Span{Key: roachpb.Key("g")}, hlc.Timestamp{WallTime: 2}) + ss.AddMVCC(SpanReadWrite, roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 2}) + ss.AddMVCC(SpanReadWrite, roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 2}) + ss.AddNonMVCC(SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(1)}) + + var allowedRO = []struct { + span roachpb.Span + ts hlc.Timestamp + }{ + // Read access allowed for a subspan or included point at a timestamp + // equal to or below associated timestamp. + {roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, hlc.Timestamp{WallTime: 2}}, + {roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, hlc.Timestamp{WallTime: 1}}, + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 2}}, + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 1}}, + {roachpb.Span{Key: roachpb.Key("g")}, hlc.Timestamp{WallTime: 2}}, + {roachpb.Span{Key: roachpb.Key("g")}, hlc.Timestamp{WallTime: 1}}, + {roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 2}}, + {roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 1}}, + + // Local keys. + {roachpb.Span{Key: keys.RangeLastGCKey(1)}, hlc.Timestamp{}}, + {roachpb.Span{Key: keys.RangeLastGCKey(1)}, hlc.Timestamp{WallTime: 1}}, + } + for _, tc := range allowedRO { + if err := ss.CheckAllowedAt(SpanReadOnly, tc.span, tc.ts); err != nil { + t.Errorf("expected %s at %s to be allowed, but got error: %+v", tc.span, tc.ts, err) + } + } + + var allowedRW = []struct { + span roachpb.Span + ts hlc.Timestamp + }{ + // Write access allowed for a subspan or included point at exactly the + // declared timestamp. + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 2}}, + {roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 2}}, + + // Points within the non-zero-length span. + {roachpb.Span{Key: roachpb.Key("n")}, hlc.Timestamp{WallTime: 2}}, + + // Sub span at the declared timestamp. + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("n")}, hlc.Timestamp{WallTime: 2}}, + + // Local keys. + {roachpb.Span{Key: keys.RangeLastGCKey(1)}, hlc.Timestamp{}}, + } + for _, tc := range allowedRW { + if err := ss.CheckAllowedAt(SpanReadWrite, tc.span, tc.ts); err != nil { + t.Errorf("expected %s at %s to be allowed, but got error: %+v", tc.span, tc.ts, err) + } + } + + readErr := "cannot read undeclared span" + writeErr := "cannot write undeclared span" + + var disallowedRO = []struct { + span roachpb.Span + ts hlc.Timestamp + }{ + // Read access disallowed for subspan or included point at timestamp greater + // than the associated timestamp. + {roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, hlc.Timestamp{WallTime: 3}}, + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 3}}, + {roachpb.Span{Key: roachpb.Key("g")}, hlc.Timestamp{WallTime: 3}}, + {roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 3}}, + } + for _, tc := range disallowedRO { + if err := ss.CheckAllowedAt(SpanReadOnly, tc.span, tc.ts); !testutils.IsError(err, readErr) { + t.Errorf("expected %s at %s to be disallowed", tc.span, tc.ts) + } + } + + var disallowedRW = []struct { + span roachpb.Span + ts hlc.Timestamp + }{ + // Write access disallowed for subspan or included point at timestamp + // different from the associated timestamp. + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 1}}, + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("o")}, hlc.Timestamp{WallTime: 3}}, + {roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 1}}, + {roachpb.Span{Key: roachpb.Key("s")}, hlc.Timestamp{WallTime: 3}}, + + // Read only spans. + {roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("d")}, hlc.Timestamp{WallTime: 2}}, + {roachpb.Span{Key: roachpb.Key("c")}, hlc.Timestamp{WallTime: 2}}, + + // Points within the non-zero-length span at a timestamp higher than what's + // declared. + {roachpb.Span{Key: roachpb.Key("n")}, hlc.Timestamp{WallTime: 3}}, + + // Sub spans at timestamps different from the one declared. + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("n")}, hlc.Timestamp{WallTime: 3}}, + {roachpb.Span{Key: roachpb.Key("m"), EndKey: roachpb.Key("n")}, hlc.Timestamp{WallTime: 1}}, + } + for _, tc := range disallowedRW { + if err := ss.CheckAllowedAt(SpanReadWrite, tc.span, tc.ts); !testutils.IsError(err, writeErr) { + t.Errorf("expected %s at %s to be disallowed", tc.span, tc.ts) + } + } +} + // Test that a span declared for write access also implies read // access, but not vice-versa. func TestSpanSetWriteImpliesRead(t *testing.T) { @@ -112,8 +226,8 @@ func TestSpanSetWriteImpliesRead(t *testing.T) { var ss SpanSet roSpan := roachpb.Span{Key: roachpb.Key("read-only")} rwSpan := roachpb.Span{Key: roachpb.Key("read-write")} - ss.Add(SpanReadOnly, roSpan) - ss.Add(SpanReadWrite, rwSpan) + ss.AddNonMVCC(SpanReadOnly, roSpan) + ss.AddNonMVCC(SpanReadWrite, rwSpan) if err := ss.CheckAllowed(SpanReadOnly, roSpan); err != nil { t.Errorf("expected to be allowed to read roSpan, error: %+v", err)