Skip to content

Commit

Permalink
kv/batcheval: unify TransferLease and Subsume latching
Browse files Browse the repository at this point in the history
Both of these requests want full mutual exclusion on the range, so it
makes sense that they would declare latches the same way.
  • Loading branch information
nvanbenschoten committed Feb 5, 2021
1 parent 98072ec commit 3c846bb
Show file tree
Hide file tree
Showing 20 changed files with 260 additions and 126 deletions.
14 changes: 7 additions & 7 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// These constants are single bytes for performance. They allow single-byte
// comparisons which are considerably faster than bytes.HasPrefix.
const (
localPrefixByte = '\x01'
LocalPrefixByte = '\x01'
localMaxByte = '\x02'
meta1PrefixByte = localMaxByte
meta2PrefixByte = '\x03'
Expand All @@ -42,8 +42,8 @@ var (
// MaxKey is the infinity marker which is larger than any other key.
MaxKey = roachpb.KeyMax

// localPrefix is the prefix for all local keys.
localPrefix = roachpb.Key{localPrefixByte}
// LocalPrefix is the prefix for all local keys.
LocalPrefix = roachpb.Key{LocalPrefixByte}
// LocalMax is the end of the local key range. It is itself a global
// key.
LocalMax = roachpb.Key{localMaxByte}
Expand All @@ -64,7 +64,7 @@ var (
// metadata is identified by one of the suffixes listed below, along
// with potentially additional encoded key info, for instance in the
// case of AbortSpan entry.
LocalRangeIDPrefix = roachpb.RKey(makeKey(localPrefix, roachpb.Key("i")))
LocalRangeIDPrefix = roachpb.RKey(makeKey(LocalPrefix, roachpb.Key("i")))
// LocalRangeIDReplicatedInfix is the post-Range ID specifier for all Raft
// replicated per-range data. By appending this after the Range ID, these
// keys will be sorted directly before the local unreplicated keys for the
Expand Down Expand Up @@ -134,7 +134,7 @@ var (
// specific sort of per-range metadata is identified by one of the
// suffixes listed below, along with potentially additional encoded
// key info, such as the txn ID in the case of a transaction record.
LocalRangePrefix = roachpb.Key(makeKey(localPrefix, roachpb.RKey("k")))
LocalRangePrefix = roachpb.Key(makeKey(LocalPrefix, roachpb.RKey("k")))
LocalRangeMax = LocalRangePrefix.PrefixEnd()
// LocalQueueLastProcessedSuffix is the suffix for replica queue state keys.
LocalQueueLastProcessedSuffix = roachpb.RKey("qlpt")
Expand All @@ -148,7 +148,7 @@ var (
// 4. Store local keys
//
// LocalStorePrefix is the prefix identifying per-store data.
LocalStorePrefix = makeKey(localPrefix, roachpb.Key("s"))
LocalStorePrefix = makeKey(LocalPrefix, roachpb.Key("s"))
// localStoreSuggestedCompactionSuffix stores suggested compactions to
// be aggregated and processed on the store.
localStoreSuggestedCompactionSuffix = []byte("comp")
Expand Down Expand Up @@ -200,7 +200,7 @@ var (
// double duty as a reference to a provisional MVCC value.
// TODO(sumeer): remember to adjust this comment when adding locks of
// other strengths, or range locks.
LocalRangeLockTablePrefix = roachpb.Key(makeKey(localPrefix, roachpb.RKey("z")))
LocalRangeLockTablePrefix = roachpb.Key(makeKey(LocalPrefix, roachpb.RKey("z")))
LockTableSingleKeyInfix = []byte("k")
// LockTableSingleKeyStart is the inclusive start key of the key range
// containing single key locks.
Expand Down
4 changes: 2 additions & 2 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ var _ = [...]interface{}{
// `LocalRangeLockTablePrefix`.
//
// `LocalRangeIDPrefix`, `localRangePrefix`, `localStorePrefix`, and
// `LocalRangeLockTablePrefix` all in turn share `localPrefix`.
// `localPrefix` was chosen arbitrarily. Local keys would work just as well
// `LocalRangeLockTablePrefix` all in turn share `LocalPrefix`.
// `LocalPrefix` was chosen arbitrarily. Local keys would work just as well
// with a different prefix, like 0xff, or even with a suffix.

// 1. Replicated range-ID local keys: These store metadata pertaining to a
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func DecodeLockTableSingleKey(key roachpb.Key) (lockedKey roachpb.Key, err error
// opposed to "user") keys, but unfortunately that name has already been
// claimed by a related (but not identical) concept.
func IsLocal(k roachpb.Key) bool {
return bytes.HasPrefix(k, localPrefix)
return bytes.HasPrefix(k, LocalPrefix)
}

// Addr returns the address for the key, used to lookup the range containing the
Expand Down
4 changes: 2 additions & 2 deletions pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestKeySorting(t *testing.T) {
roachpb.RKey("\x01").Less(roachpb.RKey("\x01\x00"))) {
t.Fatalf("something is seriously wrong with this machine")
}
if bytes.Compare(localPrefix, Meta1Prefix) >= 0 {
if bytes.Compare(LocalPrefix, Meta1Prefix) >= 0 {
t.Fatalf("local key spilling into replicated ranges")
}
if !bytes.Equal(roachpb.Key(""), roachpb.Key(nil)) {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestKeyAddressError(t *testing.T) {
RangeLastReplicaGCTimestampKey(0),
},
"local key .* malformed": {
makeKey(localPrefix, roachpb.Key("z")),
makeKey(LocalPrefix, roachpb.Key("z")),
},
}
for regexp, keyList := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion pkg/keys/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var (

// KeyDict drives the pretty-printing and pretty-scanning of the key space.
KeyDict = KeyComprehensionTable{
{Name: "/Local", start: localPrefix, end: LocalMax, Entries: []DictEntry{
{Name: "/Local", start: LocalPrefix, end: LocalMax, Entries: []DictEntry{
{Name: "/Store", prefix: roachpb.Key(LocalStorePrefix),
ppFunc: localStoreKeyPrint, PSFunc: localStoreKeyParse},
{Name: "/RangeID", prefix: roachpb.Key(LocalRangeIDPrefix),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ go_test(
"cmd_resolve_intent_test.go",
"cmd_revert_range_test.go",
"cmd_scan_test.go",
"cmd_subsume_test.go",
"cmd_truncate_log_test.go",
"declare_test.go",
"intent_test.go",
"main_test.go",
"transaction_test.go",
Expand Down
28 changes: 20 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package batcheval
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -28,13 +27,26 @@ func init() {
func declareKeysTransferLease(
rs ImmutableRangeState, _ roachpb.Header, _ roachpb.Request, latchSpans, _ *spanset.SpanSet,
) {
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(rs.GetRangeID())})
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())})
// Cover the entire addressable key space with a latch to prevent any writes
// from overlapping with lease transfers. In principle we could just use the
// current range descriptor (desc) but it could potentially change due to an
// as of yet unapplied merge.
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey})
// TransferLease must not run concurrently with any other request so it uses
// latches to synchronize with all other reads and writes on the outgoing
// leaseholder. Additionally, it observes the state of the timestamp cache
// and so it uses latches to wait for all in-flight requests to complete.
//
// Because of this, it declares a non-MVCC write over every addressable key
// in the range, even through the only key the TransferLease actually writes
// to is the RangeLeaseKey. This guarantees that it conflicts with any other
// request because every request must declare at least one addressable key.
//
// We could, in principle, declare these latches as MVCC writes at the time
// of the new lease. Doing so would block all concurrent writes but would
// allow reads below the new lease timestamp through. However, doing so
// would only be safe if we also accounted for clock uncertainty in all read
// latches so that any read that may need to observe state on the new
// leaseholder gets blocked. We actually already do this for transactional
// reads (see DefaultDeclareIsolatedKeys), but not for non-transactional
// reads. We'd need to be careful here, so we should only pull on this if we
// decide that doing so is important.
declareAllKeys(latchSpans)
}

// TransferLease sets the lease holder for the range.
Expand Down
25 changes: 4 additions & 21 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,10 @@ func declareKeysSubsume(
) {
// Subsume must not run concurrently with any other command. It declares a
// non-MVCC write over every addressable key in the range; this guarantees
// that it conflicts with any other command because every command must declare
// at least one addressable key. It does not, in fact, write any keys.
//
// We use the key bounds from the range descriptor in the request instead
// of the current range descriptor. Either would be fine because we verify
// that these match during the evaluation of the Subsume request.
args := req.(*roachpb.SubsumeRequest)
desc := args.RightDesc
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
})
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: keys.MakeRangeKeyPrefix(desc.StartKey),
EndKey: keys.MakeRangeKeyPrefix(desc.EndKey).PrefixEnd(),
})
rangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(desc.RangeID)
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: rangeIDPrefix,
EndKey: rangeIDPrefix.PrefixEnd(),
})
// that it conflicts with any other command because every command must
// declare at least one addressable key. It does not, in fact, write any
// keys.
declareAllKeys(latchSpans)
}

// Subsume freezes a range for merging with its left-hand neighbor. When called
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ func DeclareKeysForBatch(
}
}

// declareAllKeys declares a non-MVCC write over every addressable key. This
// guarantees that the caller conflicts with any other command because every
// command must declare at least one addressable key, which is tested against
// in TestRequestsSerializeWithAllKeys.
func declareAllKeys(latchSpans *spanset.SpanSet) {
// NOTE: we don't actually know what the end key of the Range will
// be at the time of request evaluation (see ImmutableRangeState),
// so we simply declare a latch over the entire keyspace. This may
// extend beyond the Range, but this is ok for the purpose of
// acquiring latches.
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.LocalPrefix, EndKey: keys.LocalMax})
latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey})
}

// CommandArgs contains all the arguments to a command.
// TODO(bdarnell): consider merging with kvserverbase.FilterArgs (which
// would probably require removing the EvalCtx field due to import order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// TestRequestsSerializeWithSubsume ensures that no request can be evaluated
// concurrently with a Subsume request. For more details, refer to the big
// comment block at the end of Subsume() in cmd_subsume.go.
//
// NB: This test is broader than it really needs to be. A more precise statement
// of the condition necessary to uphold the invariant mentioned in Subsume() is:
// No request that bumps the lease applied index of a range can be evaluated
// concurrently with a Subsume request.
func TestRequestsSerializeWithSubsume(t *testing.T) {
// TestRequestsSerializeWithAllKeys ensures that no request can be evaluated
// concurrently with either a Subsume request or a TransferLease request, both
// of which declare latches using declareAllKeys to guarantee mutual exclusion
// over the leaseholder.
func TestRequestsSerializeWithAllKeys(t *testing.T) {
defer leaktest.AfterTest(t)()
var subsumeLatchSpans, subsumeLockSpans, otherLatchSpans, otherLockSpans spanset.SpanSet
startKey := []byte(`a`)
endKey := []byte(`b`)
desc := &roachpb.RangeDescriptor{
RangeID: 0,
StartKey: startKey,
EndKey: endKey,
}
testTxn := &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: uuid.FastMakeV4(),
Key: startKey,
WriteTimestamp: hlc.Timestamp{WallTime: 1},
},
Name: "test txn",
}
header := roachpb.Header{Txn: testTxn}
subsumeRequest := &roachpb.SubsumeRequest{RightDesc: *desc}
declareKeysSubsume(desc, header, subsumeRequest, &subsumeLatchSpans, &subsumeLockSpans)

var allLatchSpans spanset.SpanSet
declareAllKeys(&allLatchSpans)

for method, command := range cmds {
t.Run(method.String(), func(t *testing.T) {
var otherLatchSpans, otherLockSpans spanset.SpanSet

startKey := []byte(`a`)
endKey := []byte(`b`)
desc := &roachpb.RangeDescriptor{
RangeID: 0,
StartKey: startKey,
EndKey: endKey,
}
testTxn := &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{
ID: uuid.FastMakeV4(),
Key: startKey,
WriteTimestamp: hlc.Timestamp{WallTime: 1},
},
Name: "test txn",
}
header := roachpb.Header{Txn: testTxn}
otherRequest := roachpb.CreateRequest(method)
if queryTxnReq, ok := otherRequest.(*roachpb.QueryTxnRequest); ok {
// QueryTxnRequest declares read-only access over the txn record of the txn
Expand All @@ -60,16 +59,15 @@ func TestRequestsSerializeWithSubsume(t *testing.T) {
// falling outside our test range's keyspace.
queryTxnReq.Txn = testTxn.TxnMeta
}

otherRequest.SetHeader(roachpb.RequestHeader{
Key: startKey,
EndKey: endKey,
Sequence: 0,
})

command.DeclareKeys(desc, header, otherRequest, &otherLatchSpans, &otherLockSpans)
if !subsumeLatchSpans.Intersects(&otherLatchSpans) {
t.Errorf("%s does not serialize with Subsume", method)
if !allLatchSpans.Intersects(&otherLatchSpans) {
t.Errorf("%s does not serialize with declareAllKeys", method)
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {

// Manually heartbeat the liveness on the first store to ensure it's
// considered live. The automatic heartbeat might not come for a while.
require.NoError(t, tc.HeartbeatLiveness(ctx, 0))
require.NoError(t, tc.HeartbeatNodeLiveness(0))

// Send several get and put requests to the RHS. The first of these to
// arrive will acquire the lease; the remaining requests will wait for that
Expand Down
29 changes: 23 additions & 6 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3353,12 +3353,15 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

// Use a manual clock so we can efficiently force leases to expire.
// Required by TestCluster.MoveRangeLeaseNonCooperatively.
manual := hlc.NewHybridManualClock()

// Detect when txn2 has completed its read of txn1's intent and block.
var txn2ID atomic.Value
var txn2BBlockOnce sync.Once
txn2BlockedC := make(chan chan struct{})
knobs := &kvserver.StoreTestingKnobs{}
knobs.EvalKnobs.TestingPostEvalFilter = func(args kvserverbase.FilterArgs) *roachpb.Error {
postEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if txn := args.Hdr.Txn; txn != nil && txn.ID == txn2ID.Load() {
txn2BBlockOnce.Do(func() {
if !errors.HasType(args.Err, (*roachpb.WriteIntentError)(nil)) {
Expand All @@ -3376,7 +3379,7 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
// Detect when txn4 discovers txn3's intent and begins to push.
var txn4ID atomic.Value
txn4PushingC := make(chan struct{}, 1)
knobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
requestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if !ba.IsSinglePushTxnRequest() {
return nil
}
Expand All @@ -3390,7 +3393,19 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
}

tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}},
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingPostEvalFilter: postEvalFilter,
},
TestingRequestFilter: requestFilter,
// Required by TestCluster.MoveRangeLeaseNonCooperatively.
AllowLeaseRequestProposalsWhenNotLeader: true,
},
Server: &server.TestingKnobs{
ClockSource: manual.UnixNano,
},
}},
})
defer tc.Stopper().Stop(ctx)
kvDB := tc.Servers[0].DB()
Expand Down Expand Up @@ -3418,8 +3433,10 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
}()
txn2UnblockC := <-txn2BlockedC

// Transfer the lease to Server 1.
err = tc.TransferRangeLease(rangeDesc, tc.Target(1))
// Transfer the lease to Server 1. Do so non-cooperatively instead of using
// a lease transfer, because the cooperative lease transfer would get stuck
// acquiring latches, which are held by txn2.
err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual)
require.NoError(t, err)

// Roll back txn1.
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var allSpans = func() spanset.SpanSet {
Key: roachpb.KeyMin,
EndKey: roachpb.KeyMax,
})
// Local keys (see `keys.localPrefix`).
// Local keys (see `keys.LocalPrefix`).
ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{
Key: append([]byte("\x01"), roachpb.KeyMin...),
EndKey: append([]byte("\x01"), roachpb.KeyMax...),
Expand Down Expand Up @@ -1458,12 +1458,10 @@ func TestReplicaDrainLease(t *testing.T) {
// expired already.

// Stop n1's heartbeats and wait for the lease to expire.

log.Infof(ctx, "test: suspending heartbeats for n1")
cleanup := s1.NodeLiveness().(*liveness.NodeLiveness).PauseAllHeartbeatsForTest()
defer cleanup()

require.NoError(t, err)
testutils.SucceedsSoon(t, func() error {
status := r1.CurrentLeaseStatus(ctx)
require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status)
Expand Down
Loading

0 comments on commit 3c846bb

Please sign in to comment.