Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: don't allow lease transfers to replicas in need of snapshot #82758

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/replicastats",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func relocateAndCheck(
voterTargets []roachpb.ReplicationTarget,
nonVoterTargets []roachpb.ReplicationTarget,
) (retries int) {
t.Helper()
every := log.Every(1 * time.Second)
testutils.SucceedsSoon(t, func() error {
err := tc.Servers[0].DB().
Expand Down
213 changes: 185 additions & 28 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,23 +1724,15 @@ func TestRangeLocalUncertaintyLimitAfterNewLease(t *testing.T) {

// Up-replicate the data in the range to node2.
tc.AddVotersOrFatal(t, keyA, tc.Target(1))
replica1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(keyA))

// Transfer the lease from node1 to node2.
replica2 := tc.GetFirstStoreFromServer(t, 1).LookupReplica(roachpb.RKey(keyA))
replica2Desc, err := replica2.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}

// Transfer the lease from node1 to node2.
node1Before := tc.Servers[0].Clock().Now()
tc.TransferRangeLeaseOrFatal(t, *replica2.Desc(), tc.Target(1))
testutils.SucceedsSoon(t, func() error {
if err := replica1.AdminTransferLease(ctx, replica2Desc.StoreID); err != nil {
t.Fatal(err)
}
lease, _ := replica2.GetLease()
if lease.Replica.NodeID != replica2.NodeID() {
return errors.Errorf("expected lease transfer to node2: %s", lease)
return errors.Errorf("expected lease transfer to apply on node2: %s", lease)
}
return nil
})
Expand Down Expand Up @@ -2152,20 +2144,15 @@ func TestLeaseInfoRequest(t *testing.T) {

// Transfer the lease to Servers[0] so we start in a known state. Otherwise,
// there might be already a lease owned by a random node.
err = tc.TransferRangeLease(rangeDesc, tc.Target(0))
if err != nil {
t.Fatal(err)
}
tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(0))

// Now test the LeaseInfo. We might need to loop until the node we query has applied the lease.
validateLeaseholderSoon(t, kvDB0, rangeDesc.StartKey.AsRawKey(), replicas[0], true)

// Transfer the lease to Server 1 and check that LeaseInfoRequest gets the
// right answer.
err = tc.TransferRangeLease(rangeDesc, tc.Target(1))
if err != nil {
t.Fatal(err)
}
tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(1))

// An inconsistent LeaseInfoReqeust on the old lease holder should give us the
// right answer immediately, since the old holder has definitely applied the
// transfer before TransferRangeLease returned.
Expand All @@ -2185,10 +2172,7 @@ func TestLeaseInfoRequest(t *testing.T) {

// Transfer the lease to Server 2 and check that LeaseInfoRequest gets the
// right answer.
err = tc.TransferRangeLease(rangeDesc, tc.Target(2))
if err != nil {
t.Fatal(err)
}
tc.TransferRangeLeaseOrFatal(t, rangeDesc, tc.Target(2))

// We're now going to ask servers[1] for the lease info. We don't use kvDB1;
// instead we go directly to the store because otherwise the DistSender might
Expand Down Expand Up @@ -2594,6 +2578,15 @@ func TestClearRange(t *testing.T) {
// possible and hard to prevent entirely. The Replica will only learn that it is
// the new leaseholder when it applies the snapshot. When doing so, it should
// make sure to apply the lease-related side-effects to its in-memory state.
//
// EDIT: as of June 2022, we have protection that should make this scenario
// significantly more rare. This test uses a knob to disable the new protection
// so that it can create the scenario where a replica learns that it holds the
// lease through a snapshot. We'll want to keep the test and the corresponding
// logic in applySnapshot around until we can eliminate the scenario entirely.
// See the commentary in github.com/cockroachdb/cockroach/issues/81561 about
// sending Raft logs in Raft snapshots for a discussion about why this may not
// be worth eliminating.
func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -2605,6 +2598,9 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
AllowLeaseTransfersWhenTargetMayNeedSnapshot: true,
},
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Expand Down Expand Up @@ -2661,12 +2657,15 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {

// Partition node 2 from the rest of its range. Once partitioned, perform
// another write and truncate the Raft log on the two connected nodes. This
// ensures that that when node 2 comes back up it will require a snapshot
// from Raft.
// ensures that when node 2 comes back up it will require a snapshot from
// Raft.
funcs := noopRaftHandlerFuncs()
funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool {
return true
}
funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error {
return errors.New("rejected")
}
tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{
rangeID: repl0.GetRangeID(),
RaftMessageHandler: store2,
Expand All @@ -2691,9 +2690,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
// Finally, transfer the lease to node 2 while it is still unavailable and
// behind. We try to avoid this case when picking new leaseholders in practice,
// but we're never 100% successful.
if err := repl0.AdminTransferLease(ctx, store2.Ident.StoreID); err != nil {
t.Fatal(err)
}
tc.TransferRangeLeaseOrFatal(t, *repl0.Desc(), tc.Target(2))

// Remove the partition. A snapshot to node 2 should follow. This snapshot
// will inform node 2 that it is the new leaseholder for the range. Node 2
Expand All @@ -2718,6 +2715,166 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
})
}

// TestLeaseTransferRejectedIfTargetNeedsSnapshot prevents a regression of
// #81561. It verifies that a replica will reject a lease transfer request if it
// can not guarantee that the lease target is sufficiently caught up on its Raft
// log such that it does not need a Raft snapshot. The test does so by inducing
// a partition, truncating the Raft log, and then trying to transfer the lease to
// the replica that is now cut off from the log. The lease transfer request must
// be rejected.
//
// The test has two variants. The !rejectAfterRevoke variant exercises the
// common but best-effort protection against unsafe lease transfers in
// Replica.AdminTransferLease. The rejectAfterRevoke variant exercises the
// uncommon but airtight protection against unsafe lease transfers in
// propBuf.maybeRejectUnsafeProposalLocked.
func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "reject-after-revoke", func(t *testing.T, rejectAfterRevoke bool) {
var transferLeaseReqBlockOnce sync.Once
transferLeaseReqBlockedC := make(chan struct{})
transferLeaseReqUnblockedC := make(chan struct{})

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if rejectAfterRevoke && ba.IsSingleTransferLeaseRequest() {
transferLeaseReqBlockOnce.Do(func() {
close(transferLeaseReqBlockedC)
<-transferLeaseReqUnblockedC
})
}
return nil
},
// Speed up the lease transfer retry loop.
LeaseTransferRejectedRetryLoopCount: 2,
},
},
},
})
defer tc.Stopper().Stop(context.Background())
store0 := tc.GetFirstStoreFromServer(t, 0)
store2 := tc.GetFirstStoreFromServer(t, 2)

keyA := tc.ScratchRange(t)
keyB := keyA.Next()
keyC := keyB.Next()

// First, do a couple of writes; we'll use these to determine when
// the dust has settled.
incA := incrementArgs(keyA, 1)
_, pErr := kv.SendWrapped(ctx, store0.TestSender(), incA)
require.Nil(t, pErr)
incC := incrementArgs(keyC, 2)
_, pErr = kv.SendWrapped(ctx, store0.TestSender(), incC)
require.Nil(t, pErr)

tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...)
tc.WaitForValues(t, keyA, []int64{1, 1, 1})
tc.WaitForValues(t, keyC, []int64{2, 2, 2})

repl0 := store0.LookupReplica(roachpb.RKey(keyA))

// Grab the current lease. We'll use it later.
preLease, _, err := tc.FindRangeLease(*repl0.Desc(), nil)
require.NoError(t, err)
require.Equal(t, store0.StoreID(), preLease.Replica.StoreID)

// Partition node 2 from the rest of its range. Once partitioned, perform
// another write and truncate the Raft log on the two connected nodes. This
// ensures that when node 2 comes back up it will require a snapshot from
// Raft.
funcs := noopRaftHandlerFuncs()
funcs.dropReq = func(*kvserverpb.RaftMessageRequest) bool {
return true
}
funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error {
return errors.New("rejected")
}
tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{
rangeID: repl0.GetRangeID(),
RaftMessageHandler: store2,
unreliableRaftHandlerFuncs: funcs,
})

_, pErr = kv.SendWrapped(ctx, store0.TestSender(), incC)
require.Nil(t, pErr)
tc.WaitForValues(t, keyC, []int64{4, 4, 2})

// If we want the lease transfer rejection to come after the leaseholder has
// revoked its lease, we launch the lease transfer before the log truncation
// and block it after it has passed through the best-effort protection in
// Replica.AdminTransferLease.
transferErrC := make(chan error, 1)
if rejectAfterRevoke {
_ = tc.Stopper().RunAsyncTask(ctx, "transfer lease", func(ctx context.Context) {
transferErrC <- tc.TransferRangeLease(*repl0.Desc(), tc.Target(2))
})
<-transferLeaseReqBlockedC
}

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment). This necessitates a snapshot when the
// partitioned replica rejoins the rest of the range.
index := repl0.GetLastIndex()
truncArgs := truncateLogArgs(index+1, repl0.GetRangeID())
truncArgs.Key = keyA
_, pErr = kv.SendWrapped(ctx, store0.TestSender(), truncArgs)
require.Nil(t, pErr)

// Complete or initiate the lease transfer attempt to node 2, which must not
// succeed because node 2 now needs a snapshot.
var transferErr error
if rejectAfterRevoke {
close(transferLeaseReqUnblockedC)
transferErr = <-transferErrC
} else {
transferErr = tc.TransferRangeLease(*repl0.Desc(), tc.Target(2))
}
isRejectedErr := kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(transferErr)
require.True(t, isRejectedErr, "%+v", transferErr)

// Remove the partition. A snapshot to node 2 should follow.
tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, store2)
tc.WaitForValues(t, keyC, []int64{4, 4, 4})

// Now that node 2 caught up on the log through a snapshot, we should be
// able to transfer the lease to it successfully.
// NOTE: we used a testing knob to disable automatic lease transfer retries,
// so we use a SucceedsSoon loop.
testutils.SucceedsSoon(t, func() error {
if err := tc.TransferRangeLease(*repl0.Desc(), tc.Target(2)); err != nil {
if kvserver.IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err) {
return err
}
t.Fatal(err)
}
return nil
})

// Verify that the lease is now held by node 2.
postLease, _, err := tc.FindRangeLease(*repl0.Desc(), nil)
require.NoError(t, err)
require.Equal(t, store2.StoreID(), postLease.Replica.StoreID)

// Additionally, verify that the lease has the expected sequence number. If
// the lease transfer rejection came after the previous lease was revoked,
// then node 0 must have re-acquired the lease (with a new sequence number)
// in order to transfer it to node 2.
expSeq := preLease.Sequence + 1
if rejectAfterRevoke {
expSeq++
}
require.Equal(t, expSeq, postLease.Sequence)
})
}

// TestConcurrentAdminChangeReplicasRequests ensures that when two attempts to
// change replicas for a range race, only one will succeed.
func TestConcurrentAdminChangeReplicasRequests(t *testing.T) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/kv/kvserver/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,21 @@ func IsIllegalReplicationChangeError(err error) bool {
var errMarkReplicationChangeInProgress = errors.New("replication change in progress")

// IsReplicationChangeInProgressError detects whether an error (assumed to have
// been emitted a replication change) indicates that the replication change
// been emitted by a replication change) indicates that the replication change
// failed because another replication change was in progress on the range.
func IsReplicationChangeInProgressError(err error) bool {
return errors.Is(err, errMarkReplicationChangeInProgress)
}

var errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot = errors.New(
"lease transfer rejected because the target may need a snapshot")

// IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError detects whether an
// error (assumed to have been emitted by a lease transfer request) indicates
// that the lease transfer failed because the current leaseholder could not
// prove that the lease transfer target did not need a Raft snapshot. In order
// to prove this, the current leaseholder must also be the Raft leader, which is
// periodically requested in maybeTransferRaftLeadershipToLeaseholderLocked.
func IsLeaseTransferRejectedBecauseTargetMayNeedSnapshotError(err error) bool {
return errors.Is(err, errMarkLeaseTransferRejectedBecauseTargetMayNeedSnapshot)
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func newUnloadedReplica(
r.mu.checksums = map[uuid.UUID]replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter

Expand Down
Loading