Skip to content

Commit

Permalink
storage: introduce fine-grained latch acquisition
Browse files Browse the repository at this point in the history
Towards #32583. The current mechanism in place for declaring latches
over MVCC keys only allows them to be declared at a fixed timestamp (on
a per request basis). For split commands for e.g., we're interested in
in declaring latches on different timestamps for the two sides of the
split. Specifically we're interested in declaring a non-MVCC read latch
over the LHS and a non-MVCC write latch over the RHS to be able to
service reads during splits (requests without timestamps, i.e. the empty
timestamp, are considered non-MVCC). The same ideas can be applied to
range merges and lease transfers.

This change allows requests to specify, during span declarations,
whether said span would require an MVCC or non-MVCC access. The access
mode would correspond to an MVCC or non-MVCC latch acquired over said
span depending. This would allow the aforementioned optimizations
possible.

Release note: None
  • Loading branch information
irfansharif committed Nov 4, 2019
1 parent b9dc16b commit 15d0c2d
Show file tree
Hide file tree
Showing 35 changed files with 1,160 additions and 398 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func declareKeysExport(
desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
batcheval.DefaultDeclareKeys(desc, header, req, spans)
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
}

// evalExport dumps the requested keys into files of non-overlapping key ranges
Expand Down
249 changes: 245 additions & 4 deletions pkg/storage/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
"github.com/pkg/errors"
)

func TestSpanSetBatch(t *testing.T) {
func TestSpanSetBatchBoundaries(t *testing.T) {
defer leaktest.AfterTest(t)()
eng := engine.NewDefaultInMem()
defer eng.Close()

var ss spanset.SpanSet
ss.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")})
ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")})
outsideKey := engine.MakeMVCCMetadataKey(roachpb.Key("a"))
outsideKey2 := engine.MakeMVCCMetadataKey(roachpb.Key("b"))
outsideKey3 := engine.MakeMVCCMetadataKey(roachpb.Key("m"))
Expand All @@ -60,7 +60,7 @@ func TestSpanSetBatch(t *testing.T) {
}

// Writes outside the range fail. We try to cover all write methods
// in the failure case to make sure the checkAllowed call is
// in the failure case to make sure the CheckAllowed call is
// present, but we don't attempt successful versions of all
// methods since those are harder to set up.
isWriteSpanErr := func(err error) bool {
Expand Down Expand Up @@ -190,6 +190,247 @@ func TestSpanSetBatch(t *testing.T) {
}
}

func TestSpanSetBatchTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
eng := engine.NewDefaultInMem()
defer eng.Close()

var ss spanset.SpanSet
ss.AddMVCC(spanset.SpanReadOnly,
roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, hlc.Timestamp{WallTime: 2})
ss.AddMVCC(spanset.SpanReadWrite,
roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")}, hlc.Timestamp{WallTime: 2})

rkey := engine.MakeMVCCMetadataKey(roachpb.Key("b"))
wkey := engine.MakeMVCCMetadataKey(roachpb.Key("e"))

value := []byte("value")

// Write value that we can try to read later.
if err := eng.Put(rkey, value); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

batchNonMVCC := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 0})
defer batchNonMVCC.Close()

batchBefore := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 1})
defer batchBefore.Close()

batchDuring := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 2})
defer batchDuring.Close()

batchAfter := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 3})
defer batchAfter.Close()

// Writes.
if err := batchDuring.Put(wkey, value); err != nil {
t.Fatalf("failed to write inside the range at same ts as latch declaration: %+v", err)
}

for _, batch := range []engine.Batch{batchAfter, batchBefore, batchNonMVCC} {
if err := batch.Put(wkey, value); err == nil {
t.Fatalf("was able to write inside the range at ts greater than latch declaration: %+v", err)
}
}

// We try to cover all write methods in the failure case to make sure
// the CheckAllowedAt call is present, but we don't attempt to successful
// versions of all methods since those are harder to set up.
isWriteSpanErr := func(err error) bool {
return testutils.IsError(err, "cannot write undeclared span")
}

for _, batch := range []engine.Batch{batchAfter, batchBefore, batchNonMVCC} {
if err := batch.Clear(wkey); !isWriteSpanErr(err) {
t.Errorf("Clear: unexpected error %v", err)
}
{
iter := batch.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax})
err := batch.ClearIterRange(iter, wkey.Key, wkey.Key)
iter.Close()
if !isWriteSpanErr(err) {
t.Errorf("ClearIterRange: unexpected error %v", err)
}
}
if err := batch.Merge(wkey, nil); !isWriteSpanErr(err) {
t.Errorf("Merge: unexpected error %v", err)
}
if err := batch.Put(wkey, nil); !isWriteSpanErr(err) {
t.Errorf("Put: unexpected error %v", err)
}
}

// Reads.
for _, batch := range []engine.Batch{batchBefore, batchDuring} {
//lint:ignore SA1019 historical usage of deprecated batch.Get is OK
if res, err := batch.Get(rkey); err != nil {
t.Errorf("failed to read inside the range: %+v", err)
} else if !bytes.Equal(res, value) {
t.Errorf("failed to read previously written value, got %q", res)
}
}

isReadSpanErr := func(err error) bool {
return testutils.IsError(err, "cannot read undeclared span")
}

for _, batch := range []engine.Batch{batchAfter, batchNonMVCC} {
//lint:ignore SA1019 historical usage of deprecated batch.Get is OK
if _, err := batch.Get(rkey); !isReadSpanErr(err) {
t.Errorf("Get: unexpected error %v", err)
}

//lint:ignore SA1019 historical usage of deprecated batch.GetProto is OK
if _, _, _, err := batch.GetProto(rkey, nil); !isReadSpanErr(err) {
t.Errorf("GetProto: unexpected error %v", err)
}
if err := batch.Iterate(rkey.Key, rkey.Key,
func(v engine.MVCCKeyValue) (bool, error) {
return false, errors.Errorf("unexpected callback: %v", v)
},
); !isReadSpanErr(err) {
t.Errorf("Iterate: unexpected error %v", err)
}
}
}

func TestSpanSetIteratorTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()
eng := engine.NewDefaultInMem()
defer eng.Close()

var ss spanset.SpanSet
ss.AddMVCC(spanset.SpanReadOnly, roachpb.Span{
Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, hlc.Timestamp{WallTime: 1})
ss.AddMVCC(spanset.SpanReadOnly, roachpb.Span{
Key: roachpb.Key("c"), EndKey: roachpb.Key("e")}, hlc.Timestamp{WallTime: 2})

k1, v1 := engine.MakeMVCCMetadataKey(roachpb.Key("b")), []byte("b-value")
k2, v2 := engine.MakeMVCCMetadataKey(roachpb.Key("d")), []byte("d-value")

// Write values that we can try to read later.
if err := eng.Put(k1, v1); err != nil {
t.Fatalf("direct write failed: %+v", err)
}
if err := eng.Put(k2, v2); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

batchNonMVCC := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 0})
defer batchNonMVCC.Close()

batchAt1 := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 1})
defer batchAt1.Close()

batchAt2 := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 2})
defer batchAt2.Close()

batchAt3 := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 3})
defer batchAt3.Close()

func() {
// When accessing at t=1, we're able to read through latches declared at t=1 and t=2.
iter := batchAt1.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()

iter.Seek(k1)
if ok, err := iter.Valid(); !ok {
t.Fatalf("expected valid iterator, err=%v", err)
}
if !reflect.DeepEqual(iter.Key(), k1) {
t.Fatalf("expected key %s, got %s", k1, iter.Key())
}

iter.Next()
if ok, err := iter.Valid(); !ok {
t.Fatalf("expected valid iterator, err=%v", err)
}
if !reflect.DeepEqual(iter.Key(), k2) {
t.Fatalf("expected key %s, got %s", k2, iter.Key())
}
}()

{
// When accessing at t=2, we're only able to read through the latch declared at t=2.
iter := batchAt2.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()

iter.Seek(k1)
if ok, _ := iter.Valid(); ok {
t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key())
}

iter.Seek(k2)
if ok, err := iter.Valid(); !ok {
t.Fatalf("expected valid iterator, err=%v", err)
}
if !reflect.DeepEqual(iter.Key(), k2) {
t.Fatalf("expected key %s, got %s", k2, iter.Key())
}
}

for _, batch := range []engine.Batch{batchAt3, batchNonMVCC} {
// When accessing at t=3, we're unable to read through any of the declared latches.
// Same is true when accessing without a timestamp.
iter := batch.NewIterator(engine.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()

iter.Seek(k1)
if ok, _ := iter.Valid(); ok {
t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key())
}

iter.Seek(k2)
if ok, _ := iter.Valid(); ok {
t.Fatalf("expected invalid iterator; found valid at key %s", iter.Key())
}
}
}

func TestSpanSetNonMVCCBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
eng := engine.NewDefaultInMem()
defer eng.Close()

var ss spanset.SpanSet
ss.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")})
ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("f")})

rkey := engine.MakeMVCCMetadataKey(roachpb.Key("b"))
wkey := engine.MakeMVCCMetadataKey(roachpb.Key("e"))

value := []byte("value")

// Write value that we can try to read later.
if err := eng.Put(rkey, value); err != nil {
t.Fatalf("direct write failed: %+v", err)
}

batchNonMVCC := spanset.NewBatch(eng.NewBatch(), &ss)
defer batchNonMVCC.Close()

batchMVCC := spanset.NewBatchAt(eng.NewBatch(), &ss, hlc.Timestamp{WallTime: 1})
defer batchMVCC.Close()

// Writes.
for _, batch := range []engine.Batch{batchNonMVCC, batchMVCC} {
if err := batch.Put(wkey, value); err != nil {
t.Fatalf("write disallowed through non-MVCC latch: %+v", err)
}
}

// Reads.
for _, batch := range []engine.Batch{batchNonMVCC, batchMVCC} {
//lint:ignore SA1019 historical usage of deprecated batch.Get is OK
if res, err := batch.Get(rkey); err != nil {
t.Errorf("read disallowed through non-MVCC latch: %+v", err)
} else if !bytes.Equal(res, value) {
t.Errorf("failed to read previously written value, got %q", res)
}
}
}

// TestSpanSetMVCCResolveWriteIntentRangeUsingIter verifies that
// MVCCResolveWriteIntentRangeUsingIter does not stray outside of the passed-in
// key range (which it only used to do in this corner case tested here).
Expand Down Expand Up @@ -217,7 +458,7 @@ func TestSpanSetMVCCResolveWriteIntentRangeUsingIter(t *testing.T) {
}

var ss spanset.SpanSet
ss.Add(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b\x00")})
ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b\x00")})

batch := spanset.NewBatch(eng.NewBatch(), &ss)
defer batch.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_begin_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func declareKeysWriteTransaction(
) {
if header.Txn != nil {
header.Txn.AssertInitialized(context.TODO())
spans.Add(spanset.SpanReadWrite, roachpb.Span{
spans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.TransactionKey(req.Header().Key, header.Txn.ID),
})
}
Expand All @@ -43,7 +43,7 @@ func declareKeysBeginTransaction(
desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
declareKeysWriteTransaction(desc, header, req, spans)
spans.Add(spanset.SpanReadOnly, roachpb.Span{
spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{
Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID),
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func declareKeysClearRange(
DefaultDeclareKeys(desc, header, req, spans)
// We look up the range descriptor key to check whether the span
// is equal to the entire range for fast stats updating.
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
spans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

// ClearRange wipes all MVCC versions of keys covered by the specified
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,29 @@ package batcheval
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.DeleteRange, DefaultDeclareKeys, DeleteRange)
RegisterCommand(roachpb.DeleteRange, declareKeysDeleteRange, DeleteRange)
}

func declareKeysDeleteRange(
_ *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
args := req.(*roachpb.DeleteRangeRequest)
access := spanset.SpanReadWrite

if args.Inline || keys.IsLocal(req.Header().Span().Key) {
spans.AddNonMVCC(access, req.Header().Span())
} else {
spans.AddMVCC(access, req.Header().Span(), header.Timestamp)
}
}

// DeleteRange deletes the range of key/value pairs specified by
Expand Down
Loading

0 comments on commit 15d0c2d

Please sign in to comment.