From 3c846bb486d2d86f4d661d87d6b592bd6e3ba3bb Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 12 Jan 2021 23:51:43 -0500 Subject: [PATCH] kv/batcheval: unify TransferLease and Subsume latching Both of these requests want full mutual exclusion on the range, so it makes sense that they would declare latches the same way. --- pkg/keys/constants.go | 14 +- pkg/keys/doc.go | 4 +- pkg/keys/keys.go | 2 +- pkg/keys/keys_test.go | 4 +- pkg/keys/printer.go | 2 +- pkg/kv/kvserver/batcheval/BUILD.bazel | 2 +- .../kvserver/batcheval/cmd_lease_transfer.go | 28 ++-- pkg/kv/kvserver/batcheval/cmd_subsume.go | 25 +--- pkg/kv/kvserver/batcheval/declare.go | 14 ++ .../{cmd_subsume_test.go => declare_test.go} | 60 ++++---- pkg/kv/kvserver/client_merge_test.go | 2 +- pkg/kv/kvserver/client_replica_test.go | 29 +++- pkg/kv/kvserver/replica_test.go | 4 +- pkg/server/testserver.go | 23 +++ pkg/storage/enginepb/mvcc.pb.go | 16 +-- pkg/storage/enginepb/mvcc.proto | 2 +- .../serverutils/test_cluster_shim.go | 16 +++ pkg/testutils/serverutils/test_server_shim.go | 3 + pkg/testutils/testcluster/BUILD.bazel | 2 +- pkg/testutils/testcluster/testcluster.go | 134 +++++++++++++----- 20 files changed, 260 insertions(+), 126 deletions(-) rename pkg/kv/kvserver/batcheval/{cmd_subsume_test.go => declare_test.go} (54%) diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index d4f8c978accf..150372342a4e 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -22,7 +22,7 @@ import ( // These constants are single bytes for performance. They allow single-byte // comparisons which are considerably faster than bytes.HasPrefix. const ( - localPrefixByte = '\x01' + LocalPrefixByte = '\x01' localMaxByte = '\x02' meta1PrefixByte = localMaxByte meta2PrefixByte = '\x03' @@ -42,8 +42,8 @@ var ( // MaxKey is the infinity marker which is larger than any other key. MaxKey = roachpb.KeyMax - // localPrefix is the prefix for all local keys. - localPrefix = roachpb.Key{localPrefixByte} + // LocalPrefix is the prefix for all local keys. + LocalPrefix = roachpb.Key{LocalPrefixByte} // LocalMax is the end of the local key range. It is itself a global // key. LocalMax = roachpb.Key{localMaxByte} @@ -64,7 +64,7 @@ var ( // metadata is identified by one of the suffixes listed below, along // with potentially additional encoded key info, for instance in the // case of AbortSpan entry. - LocalRangeIDPrefix = roachpb.RKey(makeKey(localPrefix, roachpb.Key("i"))) + LocalRangeIDPrefix = roachpb.RKey(makeKey(LocalPrefix, roachpb.Key("i"))) // LocalRangeIDReplicatedInfix is the post-Range ID specifier for all Raft // replicated per-range data. By appending this after the Range ID, these // keys will be sorted directly before the local unreplicated keys for the @@ -134,7 +134,7 @@ var ( // specific sort of per-range metadata is identified by one of the // suffixes listed below, along with potentially additional encoded // key info, such as the txn ID in the case of a transaction record. - LocalRangePrefix = roachpb.Key(makeKey(localPrefix, roachpb.RKey("k"))) + LocalRangePrefix = roachpb.Key(makeKey(LocalPrefix, roachpb.RKey("k"))) LocalRangeMax = LocalRangePrefix.PrefixEnd() // LocalQueueLastProcessedSuffix is the suffix for replica queue state keys. LocalQueueLastProcessedSuffix = roachpb.RKey("qlpt") @@ -148,7 +148,7 @@ var ( // 4. Store local keys // // LocalStorePrefix is the prefix identifying per-store data. - LocalStorePrefix = makeKey(localPrefix, roachpb.Key("s")) + LocalStorePrefix = makeKey(LocalPrefix, roachpb.Key("s")) // localStoreSuggestedCompactionSuffix stores suggested compactions to // be aggregated and processed on the store. localStoreSuggestedCompactionSuffix = []byte("comp") @@ -200,7 +200,7 @@ var ( // double duty as a reference to a provisional MVCC value. // TODO(sumeer): remember to adjust this comment when adding locks of // other strengths, or range locks. - LocalRangeLockTablePrefix = roachpb.Key(makeKey(localPrefix, roachpb.RKey("z"))) + LocalRangeLockTablePrefix = roachpb.Key(makeKey(LocalPrefix, roachpb.RKey("z"))) LockTableSingleKeyInfix = []byte("k") // LockTableSingleKeyStart is the inclusive start key of the key range // containing single key locks. diff --git a/pkg/keys/doc.go b/pkg/keys/doc.go index 3147014c29a3..c2a94ba441d2 100644 --- a/pkg/keys/doc.go +++ b/pkg/keys/doc.go @@ -177,8 +177,8 @@ var _ = [...]interface{}{ // `LocalRangeLockTablePrefix`. // // `LocalRangeIDPrefix`, `localRangePrefix`, `localStorePrefix`, and - // `LocalRangeLockTablePrefix` all in turn share `localPrefix`. - // `localPrefix` was chosen arbitrarily. Local keys would work just as well + // `LocalRangeLockTablePrefix` all in turn share `LocalPrefix`. + // `LocalPrefix` was chosen arbitrarily. Local keys would work just as well // with a different prefix, like 0xff, or even with a suffix. // 1. Replicated range-ID local keys: These store metadata pertaining to a diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index 00e614bcfb88..8fc8d3b175f4 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -478,7 +478,7 @@ func DecodeLockTableSingleKey(key roachpb.Key) (lockedKey roachpb.Key, err error // opposed to "user") keys, but unfortunately that name has already been // claimed by a related (but not identical) concept. func IsLocal(k roachpb.Key) bool { - return bytes.HasPrefix(k, localPrefix) + return bytes.HasPrefix(k, LocalPrefix) } // Addr returns the address for the key, used to lookup the range containing the diff --git a/pkg/keys/keys_test.go b/pkg/keys/keys_test.go index 0eb436ffab6f..64453a03b628 100644 --- a/pkg/keys/keys_test.go +++ b/pkg/keys/keys_test.go @@ -66,7 +66,7 @@ func TestKeySorting(t *testing.T) { roachpb.RKey("\x01").Less(roachpb.RKey("\x01\x00"))) { t.Fatalf("something is seriously wrong with this machine") } - if bytes.Compare(localPrefix, Meta1Prefix) >= 0 { + if bytes.Compare(LocalPrefix, Meta1Prefix) >= 0 { t.Fatalf("local key spilling into replicated ranges") } if !bytes.Equal(roachpb.Key(""), roachpb.Key(nil)) { @@ -140,7 +140,7 @@ func TestKeyAddressError(t *testing.T) { RangeLastReplicaGCTimestampKey(0), }, "local key .* malformed": { - makeKey(localPrefix, roachpb.Key("z")), + makeKey(LocalPrefix, roachpb.Key("z")), }, } for regexp, keyList := range testCases { diff --git a/pkg/keys/printer.go b/pkg/keys/printer.go index 28fba6fd6d2e..2ac5c3b8bdfb 100644 --- a/pkg/keys/printer.go +++ b/pkg/keys/printer.go @@ -70,7 +70,7 @@ var ( // KeyDict drives the pretty-printing and pretty-scanning of the key space. KeyDict = KeyComprehensionTable{ - {Name: "/Local", start: localPrefix, end: LocalMax, Entries: []DictEntry{ + {Name: "/Local", start: LocalPrefix, end: LocalMax, Entries: []DictEntry{ {Name: "/Store", prefix: roachpb.Key(LocalStorePrefix), ppFunc: localStoreKeyPrint, PSFunc: localStoreKeyParse}, {Name: "/RangeID", prefix: roachpb.Key(LocalRangeIDPrefix), diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 74533bf6b386..8b2bc863b321 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -92,8 +92,8 @@ go_test( "cmd_resolve_intent_test.go", "cmd_revert_range_test.go", "cmd_scan_test.go", - "cmd_subsume_test.go", "cmd_truncate_log_test.go", + "declare_test.go", "intent_test.go", "main_test.go", "transaction_test.go", diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go index 3f17dbcb4468..0bf25b973dbf 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_transfer.go @@ -13,7 +13,6 @@ package batcheval import ( "context" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -28,13 +27,26 @@ func init() { func declareKeysTransferLease( rs ImmutableRangeState, _ roachpb.Header, _ roachpb.Request, latchSpans, _ *spanset.SpanSet, ) { - latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(rs.GetRangeID())}) - latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(rs.GetStartKey())}) - // Cover the entire addressable key space with a latch to prevent any writes - // from overlapping with lease transfers. In principle we could just use the - // current range descriptor (desc) but it could potentially change due to an - // as of yet unapplied merge. - latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey}) + // TransferLease must not run concurrently with any other request so it uses + // latches to synchronize with all other reads and writes on the outgoing + // leaseholder. Additionally, it observes the state of the timestamp cache + // and so it uses latches to wait for all in-flight requests to complete. + // + // Because of this, it declares a non-MVCC write over every addressable key + // in the range, even through the only key the TransferLease actually writes + // to is the RangeLeaseKey. This guarantees that it conflicts with any other + // request because every request must declare at least one addressable key. + // + // We could, in principle, declare these latches as MVCC writes at the time + // of the new lease. Doing so would block all concurrent writes but would + // allow reads below the new lease timestamp through. However, doing so + // would only be safe if we also accounted for clock uncertainty in all read + // latches so that any read that may need to observe state on the new + // leaseholder gets blocked. We actually already do this for transactional + // reads (see DefaultDeclareIsolatedKeys), but not for non-transactional + // reads. We'd need to be careful here, so we should only pull on this if we + // decide that doing so is important. + declareAllKeys(latchSpans) } // TransferLease sets the lease holder for the range. diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume.go b/pkg/kv/kvserver/batcheval/cmd_subsume.go index 80de40328605..cc7b44015ff1 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume.go +++ b/pkg/kv/kvserver/batcheval/cmd_subsume.go @@ -32,27 +32,10 @@ func declareKeysSubsume( ) { // Subsume must not run concurrently with any other command. It declares a // non-MVCC write over every addressable key in the range; this guarantees - // that it conflicts with any other command because every command must declare - // at least one addressable key. It does not, in fact, write any keys. - // - // We use the key bounds from the range descriptor in the request instead - // of the current range descriptor. Either would be fine because we verify - // that these match during the evaluation of the Subsume request. - args := req.(*roachpb.SubsumeRequest) - desc := args.RightDesc - latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ - Key: desc.StartKey.AsRawKey(), - EndKey: desc.EndKey.AsRawKey(), - }) - latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ - Key: keys.MakeRangeKeyPrefix(desc.StartKey), - EndKey: keys.MakeRangeKeyPrefix(desc.EndKey).PrefixEnd(), - }) - rangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(desc.RangeID) - latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ - Key: rangeIDPrefix, - EndKey: rangeIDPrefix.PrefixEnd(), - }) + // that it conflicts with any other command because every command must + // declare at least one addressable key. It does not, in fact, write any + // keys. + declareAllKeys(latchSpans) } // Subsume freezes a range for merging with its left-hand neighbor. When called diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 21341482e468..d24224ab9219 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -82,6 +82,20 @@ func DeclareKeysForBatch( } } +// declareAllKeys declares a non-MVCC write over every addressable key. This +// guarantees that the caller conflicts with any other command because every +// command must declare at least one addressable key, which is tested against +// in TestRequestsSerializeWithAllKeys. +func declareAllKeys(latchSpans *spanset.SpanSet) { + // NOTE: we don't actually know what the end key of the Range will + // be at the time of request evaluation (see ImmutableRangeState), + // so we simply declare a latch over the entire keyspace. This may + // extend beyond the Range, but this is ok for the purpose of + // acquiring latches. + latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.LocalPrefix, EndKey: keys.LocalMax}) + latchSpans.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey}) +} + // CommandArgs contains all the arguments to a command. // TODO(bdarnell): consider merging with kvserverbase.FilterArgs (which // would probably require removing the EvalCtx field due to import order diff --git a/pkg/kv/kvserver/batcheval/cmd_subsume_test.go b/pkg/kv/kvserver/batcheval/declare_test.go similarity index 54% rename from pkg/kv/kvserver/batcheval/cmd_subsume_test.go rename to pkg/kv/kvserver/batcheval/declare_test.go index 1ce40c608796..409418806ea5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_subsume_test.go +++ b/pkg/kv/kvserver/batcheval/declare_test.go @@ -21,37 +21,36 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" ) -// TestRequestsSerializeWithSubsume ensures that no request can be evaluated -// concurrently with a Subsume request. For more details, refer to the big -// comment block at the end of Subsume() in cmd_subsume.go. -// -// NB: This test is broader than it really needs to be. A more precise statement -// of the condition necessary to uphold the invariant mentioned in Subsume() is: -// No request that bumps the lease applied index of a range can be evaluated -// concurrently with a Subsume request. -func TestRequestsSerializeWithSubsume(t *testing.T) { +// TestRequestsSerializeWithAllKeys ensures that no request can be evaluated +// concurrently with either a Subsume request or a TransferLease request, both +// of which declare latches using declareAllKeys to guarantee mutual exclusion +// over the leaseholder. +func TestRequestsSerializeWithAllKeys(t *testing.T) { defer leaktest.AfterTest(t)() - var subsumeLatchSpans, subsumeLockSpans, otherLatchSpans, otherLockSpans spanset.SpanSet - startKey := []byte(`a`) - endKey := []byte(`b`) - desc := &roachpb.RangeDescriptor{ - RangeID: 0, - StartKey: startKey, - EndKey: endKey, - } - testTxn := &roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - ID: uuid.FastMakeV4(), - Key: startKey, - WriteTimestamp: hlc.Timestamp{WallTime: 1}, - }, - Name: "test txn", - } - header := roachpb.Header{Txn: testTxn} - subsumeRequest := &roachpb.SubsumeRequest{RightDesc: *desc} - declareKeysSubsume(desc, header, subsumeRequest, &subsumeLatchSpans, &subsumeLockSpans) + + var allLatchSpans spanset.SpanSet + declareAllKeys(&allLatchSpans) + for method, command := range cmds { t.Run(method.String(), func(t *testing.T) { + var otherLatchSpans, otherLockSpans spanset.SpanSet + + startKey := []byte(`a`) + endKey := []byte(`b`) + desc := &roachpb.RangeDescriptor{ + RangeID: 0, + StartKey: startKey, + EndKey: endKey, + } + testTxn := &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: uuid.FastMakeV4(), + Key: startKey, + WriteTimestamp: hlc.Timestamp{WallTime: 1}, + }, + Name: "test txn", + } + header := roachpb.Header{Txn: testTxn} otherRequest := roachpb.CreateRequest(method) if queryTxnReq, ok := otherRequest.(*roachpb.QueryTxnRequest); ok { // QueryTxnRequest declares read-only access over the txn record of the txn @@ -60,7 +59,6 @@ func TestRequestsSerializeWithSubsume(t *testing.T) { // falling outside our test range's keyspace. queryTxnReq.Txn = testTxn.TxnMeta } - otherRequest.SetHeader(roachpb.RequestHeader{ Key: startKey, EndKey: endKey, @@ -68,8 +66,8 @@ func TestRequestsSerializeWithSubsume(t *testing.T) { }) command.DeclareKeys(desc, header, otherRequest, &otherLatchSpans, &otherLockSpans) - if !subsumeLatchSpans.Intersects(&otherLatchSpans) { - t.Errorf("%s does not serialize with Subsume", method) + if !allLatchSpans.Intersects(&otherLatchSpans) { + t.Errorf("%s does not serialize with declareAllKeys", method) } }) } diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index d38e9dc0efc8..0d4bf1a04ac8 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1457,7 +1457,7 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { // Manually heartbeat the liveness on the first store to ensure it's // considered live. The automatic heartbeat might not come for a while. - require.NoError(t, tc.HeartbeatLiveness(ctx, 0)) + require.NoError(t, tc.HeartbeatNodeLiveness(0)) // Send several get and put requests to the RHS. The first of these to // arrive will acquire the lease; the remaining requests will wait for that diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index fddc0505807c..c860c1a696bd 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3353,12 +3353,15 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() + // Use a manual clock so we can efficiently force leases to expire. + // Required by TestCluster.MoveRangeLeaseNonCooperatively. + manual := hlc.NewHybridManualClock() + // Detect when txn2 has completed its read of txn1's intent and block. var txn2ID atomic.Value var txn2BBlockOnce sync.Once txn2BlockedC := make(chan chan struct{}) - knobs := &kvserver.StoreTestingKnobs{} - knobs.EvalKnobs.TestingPostEvalFilter = func(args kvserverbase.FilterArgs) *roachpb.Error { + postEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { if txn := args.Hdr.Txn; txn != nil && txn.ID == txn2ID.Load() { txn2BBlockOnce.Do(func() { if !errors.HasType(args.Err, (*roachpb.WriteIntentError)(nil)) { @@ -3376,7 +3379,7 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { // Detect when txn4 discovers txn3's intent and begins to push. var txn4ID atomic.Value txn4PushingC := make(chan struct{}, 1) - knobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + requestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { if !ba.IsSinglePushTxnRequest() { return nil } @@ -3390,7 +3393,19 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { } tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: knobs}}, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + TestingPostEvalFilter: postEvalFilter, + }, + TestingRequestFilter: requestFilter, + // Required by TestCluster.MoveRangeLeaseNonCooperatively. + AllowLeaseRequestProposalsWhenNotLeader: true, + }, + Server: &server.TestingKnobs{ + ClockSource: manual.UnixNano, + }, + }}, }) defer tc.Stopper().Stop(ctx) kvDB := tc.Servers[0].DB() @@ -3418,8 +3433,10 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { }() txn2UnblockC := <-txn2BlockedC - // Transfer the lease to Server 1. - err = tc.TransferRangeLease(rangeDesc, tc.Target(1)) + // Transfer the lease to Server 1. Do so non-cooperatively instead of using + // a lease transfer, because the cooperative lease transfer would get stuck + // acquiring latches, which are held by txn2. + err = tc.MoveRangeLeaseNonCooperatively(rangeDesc, tc.Target(1), manual) require.NoError(t, err) // Roll back txn1. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index ed65c2a4c2a5..88de17845453 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -86,7 +86,7 @@ var allSpans = func() spanset.SpanSet { Key: roachpb.KeyMin, EndKey: roachpb.KeyMax, }) - // Local keys (see `keys.localPrefix`). + // Local keys (see `keys.LocalPrefix`). ss.AddNonMVCC(spanset.SpanReadWrite, roachpb.Span{ Key: append([]byte("\x01"), roachpb.KeyMin...), EndKey: append([]byte("\x01"), roachpb.KeyMax...), @@ -1458,12 +1458,10 @@ func TestReplicaDrainLease(t *testing.T) { // expired already. // Stop n1's heartbeats and wait for the lease to expire. - log.Infof(ctx, "test: suspending heartbeats for n1") cleanup := s1.NodeLiveness().(*liveness.NodeLiveness).PauseAllHeartbeatsForTest() defer cleanup() - require.NoError(t, err) testutils.SucceedsSoon(t, func() error { status := r1.CurrentLeaseStatus(ctx) require.True(t, status.Lease.OwnedBy(store1.StoreID()), "someone else got the lease: %s", status) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 382246545bc2..bc785b8c018e 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" @@ -63,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -371,6 +373,27 @@ func (ts *TestServer) NodeLiveness() interface{} { return nil } +// HeartbeatNodeLiveness heartbeats the server's NodeLiveness record. +func (ts *TestServer) HeartbeatNodeLiveness() error { + if ts == nil { + return errors.New("no node liveness instance") + } + nl := ts.nodeLiveness + l, ok := nl.Self() + if !ok { + return errors.New("liveness not found") + } + + var err error + ctx := context.Background() + for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); { + if err = nl.Heartbeat(ctx, l); !errors.Is(err, liveness.ErrEpochIncremented) { + break + } + } + return err +} + // RPCContext returns the rpc context used by the TestServer. func (ts *TestServer) RPCContext() *rpc.Context { if ts != nil { diff --git a/pkg/storage/enginepb/mvcc.pb.go b/pkg/storage/enginepb/mvcc.pb.go index aafef5ffdd5a..a36775f8a176 100644 --- a/pkg/storage/enginepb/mvcc.pb.go +++ b/pkg/storage/enginepb/mvcc.pb.go @@ -78,7 +78,7 @@ type MVCCMetadata struct { func (m *MVCCMetadata) Reset() { *m = MVCCMetadata{} } func (*MVCCMetadata) ProtoMessage() {} func (*MVCCMetadata) Descriptor() ([]byte, []int) { - return fileDescriptor_mvcc_69eca97822587899, []int{0} + return fileDescriptor_mvcc_33f4aa682d7df09c, []int{0} } func (m *MVCCMetadata) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -118,7 +118,7 @@ type MVCCMetadata_SequencedIntent struct { func (m *MVCCMetadata_SequencedIntent) Reset() { *m = MVCCMetadata_SequencedIntent{} } func (*MVCCMetadata_SequencedIntent) ProtoMessage() {} func (*MVCCMetadata_SequencedIntent) Descriptor() ([]byte, []int) { - return fileDescriptor_mvcc_69eca97822587899, []int{0, 0} + return fileDescriptor_mvcc_33f4aa682d7df09c, []int{0, 0} } func (m *MVCCMetadata_SequencedIntent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -162,7 +162,7 @@ func (m *MVCCMetadataSubsetForMergeSerialization) Reset() { } func (*MVCCMetadataSubsetForMergeSerialization) ProtoMessage() {} func (*MVCCMetadataSubsetForMergeSerialization) Descriptor() ([]byte, []int) { - return fileDescriptor_mvcc_69eca97822587899, []int{1} + return fileDescriptor_mvcc_33f4aa682d7df09c, []int{1} } func (m *MVCCMetadataSubsetForMergeSerialization) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -278,7 +278,7 @@ type MVCCStats struct { // This tracks the same quantity as (key_bytes + val_bytes), but // for system-local metadata keys (which aren't counted in either // key_bytes or val_bytes). Each of the keys falling into this group - // is documented in keys/constants.go under the localPrefix constant + // is documented in keys/constants.go under the LocalPrefix constant // and is prefixed by either LocalRangeIDPrefix or LocalRangePrefix. SysBytes int64 `protobuf:"fixed64,12,opt,name=sys_bytes,json=sysBytes" json:"sys_bytes"` // sys_count is the number of meta keys tracked under sys_bytes. @@ -292,7 +292,7 @@ func (m *MVCCStats) Reset() { *m = MVCCStats{} } func (m *MVCCStats) String() string { return proto.CompactTextString(m) } func (*MVCCStats) ProtoMessage() {} func (*MVCCStats) Descriptor() ([]byte, []int) { - return fileDescriptor_mvcc_69eca97822587899, []int{2} + return fileDescriptor_mvcc_33f4aa682d7df09c, []int{2} } func (m *MVCCStats) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -346,7 +346,7 @@ func (m *MVCCStatsLegacyRepresentation) Reset() { *m = MVCCStatsLegacyRe func (m *MVCCStatsLegacyRepresentation) String() string { return proto.CompactTextString(m) } func (*MVCCStatsLegacyRepresentation) ProtoMessage() {} func (*MVCCStatsLegacyRepresentation) Descriptor() ([]byte, []int) { - return fileDescriptor_mvcc_69eca97822587899, []int{3} + return fileDescriptor_mvcc_33f4aa682d7df09c, []int{3} } func (m *MVCCStatsLegacyRepresentation) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -2289,9 +2289,9 @@ var ( ErrIntOverflowMvcc = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("storage/enginepb/mvcc.proto", fileDescriptor_mvcc_69eca97822587899) } +func init() { proto.RegisterFile("storage/enginepb/mvcc.proto", fileDescriptor_mvcc_33f4aa682d7df09c) } -var fileDescriptor_mvcc_69eca97822587899 = []byte{ +var fileDescriptor_mvcc_33f4aa682d7df09c = []byte{ // 780 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x95, 0x31, 0x4f, 0xdb, 0x4c, 0x18, 0xc7, 0xe3, 0x37, 0x01, 0x9c, 0x4b, 0x48, 0xc0, 0x2f, 0xd2, 0x1b, 0x85, 0xb7, 0x4e, 0x0a, diff --git a/pkg/storage/enginepb/mvcc.proto b/pkg/storage/enginepb/mvcc.proto index c6cc5fd9685b..f140e65c2c2e 100644 --- a/pkg/storage/enginepb/mvcc.proto +++ b/pkg/storage/enginepb/mvcc.proto @@ -200,7 +200,7 @@ message MVCCStats { // This tracks the same quantity as (key_bytes + val_bytes), but // for system-local metadata keys (which aren't counted in either // key_bytes or val_bytes). Each of the keys falling into this group - // is documented in keys/constants.go under the localPrefix constant + // is documented in keys/constants.go under the LocalPrefix constant // and is prefixed by either LocalRangeIDPrefix or LocalRangePrefix. optional sfixed64 sys_bytes = 12 [(gogoproto.nullable) = false]; // sys_count is the number of meta keys tracked under sys_bytes. diff --git a/pkg/testutils/serverutils/test_cluster_shim.go b/pkg/testutils/serverutils/test_cluster_shim.go index ea334f956cfd..db2a8e062d87 100644 --- a/pkg/testutils/serverutils/test_cluster_shim.go +++ b/pkg/testutils/serverutils/test_cluster_shim.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/stop" ) @@ -129,6 +130,21 @@ type TestClusterInterface interface { rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, ) error + // MoveRangeLeaseNonCooperatively performs a non-cooperative transfer of the + // lease for a range from whoever has it to a particular store. That store + // must already have a replica of the range. If that replica already has the + // (active) lease, this method is a no-op. + // + // The transfer is non-cooperative in that the lease is made to expire by + // advancing the manual clock. The target is then instructed to acquire the + // ownerless lease. Most tests should use the cooperative version of this + // method, TransferRangeLease. + MoveRangeLeaseNonCooperatively( + rangeDesc roachpb.RangeDescriptor, + dest roachpb.ReplicationTarget, + manual *hlc.HybridManualClock, + ) error + // LookupRange returns the descriptor of the range containing key. LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, error) diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index fa047022bf69..24fd40eba088 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -128,6 +128,9 @@ type TestServerInterface interface { // interface{}. NodeLiveness() interface{} + // HeartbeatNodeLiveness heartbeats the server's NodeLiveness record. + HeartbeatNodeLiveness() error + // SetDistSQLSpanResolver changes the SpanResolver used for DistSQL inside the // server's executor. The argument must be a physicalplan.SpanResolver // instance. diff --git a/pkg/testutils/testcluster/BUILD.bazel b/pkg/testutils/testcluster/BUILD.bazel index a004bd8d5ef2..8cebf296af56 100644 --- a/pkg/testutils/testcluster/BUILD.bazel +++ b/pkg/testutils/testcluster/BUILD.bazel @@ -9,8 +9,8 @@ go_library( "//pkg/base", "//pkg/gossip", "//pkg/keys", + "//pkg/kv", "//pkg/kv/kvserver", - "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/roachpb", "//pkg/rpc", diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index dcebe198d824..0ab8c792263b 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -23,8 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -771,7 +771,7 @@ func (tc *TestCluster) RemoveNonVotersOrFatal( return desc } -// TransferRangeLease is part of the TestServerInterface. +// TransferRangeLease is part of the TestClusterInterface. func (tc *TestCluster) TransferRangeLease( rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, ) error { @@ -792,6 +792,88 @@ func (tc *TestCluster) TransferRangeLeaseOrFatal( } } +// MoveRangeLeaseNonCooperatively is part of the TestClusterInterface. +func (tc *TestCluster) MoveRangeLeaseNonCooperatively( + rangeDesc roachpb.RangeDescriptor, dest roachpb.ReplicationTarget, manual *hlc.HybridManualClock, +) error { + knobs := tc.clusterArgs.ServerArgs.Knobs.Store.(*kvserver.StoreTestingKnobs) + if !knobs.AllowLeaseRequestProposalsWhenNotLeader { + // Without this knob, we'd have to architect a Raft leadership change + // too in order to let the replica get the lease. It's easier to just + // require that callers set it. + return errors.Errorf("must set StoreTestingKnobs.AllowLeaseRequestProposalsWhenNotLeader") + } + + destServer, err := tc.findMemberServer(dest.StoreID) + if err != nil { + return err + } + destStore, err := destServer.Stores().GetStore(dest.StoreID) + if err != nil { + return err + } + + // We are going to advance the manual clock so that the current lease + // expires and then issue a request to the target in hopes that it grabs the + // lease. But it is possible that another replica grabs the lease before us + // when it's up for grabs. To handle that case, we wrap the entire operation + // in an outer retry loop. + const retryDur = testutils.DefaultSucceedsSoonDuration + return retry.ForDuration(retryDur, func() error { + // Find the current lease. + prevLease, _, err := tc.FindRangeLease(rangeDesc, nil /* hint */) + if err != nil { + return err + } + if prevLease.Replica.StoreID == dest.StoreID { + return nil + } + + // Advance the manual clock past the lease's expiration. + lhStore, err := tc.findMemberStore(prevLease.Replica.StoreID) + if err != nil { + return err + } + manual.Increment(lhStore.GetStoreConfig().LeaseExpiration()) + + // Heartbeat the destination server's liveness record so that if we are + // attempting to acquire an epoch-based lease, the server will be live. + err = destServer.HeartbeatNodeLiveness() + if err != nil { + return err + } + + // Issue a request to the target replica, which should notice that the + // old lease has expired and that it can acquire the lease. + var newLease *roachpb.Lease + ctx := context.Background() + req := &roachpb.LeaseInfoRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: rangeDesc.StartKey.AsRawKey(), + }, + } + h := roachpb.Header{RangeID: rangeDesc.RangeID} + reply, pErr := kv.SendWrappedWith(ctx, destStore, h, req) + if pErr != nil { + log.Infof(ctx, "LeaseInfoRequest failed: %v", pErr) + if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && lErr.Lease != nil { + newLease = lErr.Lease + } else { + return pErr.GoError() + } + } else { + newLease = &reply.(*roachpb.LeaseInfoResponse).Lease + } + + // Is the lease in the right place? + if newLease.Replica.StoreID != dest.StoreID { + return errors.Errorf("LeaseInfoRequest succeeded, "+ + "but lease in wrong location, want %v, got %v", dest, newLease.Replica) + } + return nil + }) +} + // FindRangeLease is similar to FindRangeLeaseHolder but returns a Lease proto // without verifying if the lease is still active. Instead, it returns a time- // stamp taken off the queried node's clock. @@ -812,15 +894,9 @@ func (tc *TestCluster) FindRangeLease( // Find the server indicated by the hint and send a LeaseInfoRequest through // it. - var hintServer *server.TestServer - for _, s := range tc.Servers { - if s.GetNode().Descriptor.NodeID == hint.NodeID { - hintServer = s - break - } - } - if hintServer == nil { - return roachpb.Lease{}, hlc.ClockTimestamp{}, errors.Errorf("bad hint: %+v; no such node", hint) + hintServer, err := tc.findMemberServer(hint.StoreID) + if err != nil { + return roachpb.Lease{}, hlc.ClockTimestamp{}, errors.Wrapf(err, "bad hint: %+v; no such node", hint) } return hintServer.GetRangeLease(context.TODO(), rangeDesc.StartKey.AsRawKey()) @@ -910,20 +986,25 @@ func (tc *TestCluster) WaitForSplitAndInitialization(startKey roachpb.Key) error }) } -// findMemberStore returns the store containing a given replica. -func (tc *TestCluster) findMemberStore(storeID roachpb.StoreID) (*kvserver.Store, error) { +// findMemberServer returns the server containing a given store. +func (tc *TestCluster) findMemberServer(storeID roachpb.StoreID) (*server.TestServer, error) { for _, server := range tc.Servers { if server.Stores().HasStore(storeID) { - store, err := server.Stores().GetStore(storeID) - if err != nil { - return nil, err - } - return store, nil + return server, nil } } return nil, errors.Errorf("store not found") } +// findMemberStore returns the store with a given ID. +func (tc *TestCluster) findMemberStore(storeID roachpb.StoreID) (*kvserver.Store, error) { + server, err := tc.findMemberServer(storeID) + if err != nil { + return nil, err + } + return server.Stores().GetStore(storeID) +} + // WaitForFullReplication waits until all stores in the cluster // have no ranges with replication pending. // @@ -1263,20 +1344,9 @@ func (tc *TestCluster) GetStatusClient( return serverpb.NewStatusClient(cc) } -// HeartbeatLiveness sends a liveness heartbeat on a particular store. -func (tc *TestCluster) HeartbeatLiveness(ctx context.Context, storeIdx int) error { - nl := tc.Servers[storeIdx].NodeLiveness().(*liveness.NodeLiveness) - l, ok := nl.Self() - if !ok { - return errors.New("liveness not found") - } - var err error - for r := retry.StartWithCtx(ctx, retry.Options{MaxRetries: 5}); r.Next(); { - if err = nl.Heartbeat(ctx, l); !errors.Is(err, liveness.ErrEpochIncremented) { - break - } - } - return err +// HeartbeatNodeLiveness sends a liveness heartbeat on a particular store. +func (tc *TestCluster) HeartbeatNodeLiveness(storeIdx int) error { + return tc.Servers[storeIdx].HeartbeatNodeLiveness() } type testClusterFactoryImpl struct{}