Skip to content

Commit

Permalink
Merge #89340
Browse files Browse the repository at this point in the history
89340: kv: bypass lease transfer safety checks during joint consensus r=shralex a=nvanbenschoten

This commit adds logic to bypass lease transfer safety checks (added in 034611b) when in a joint configuration and transferring the lease from a VOTER_DEMOTING to a VOTER_INCOMING. We do so because we could get stuck without a path to exit the joint configuration if we rejected this lease transfer while waiting to confirm that the target is up-to-date on its log. That confirmation may never arrive if the target is dead or partitioned away, and while we'd rather not transfer the lease to a dead node, at least we have a mechanism to recovery from that state. We also just sent the VOTER_INCOMING a snapshot (as a LEARNER, before promotion), so it is unlikely that the replica is actually dead or behind on its log.

A better alternative here would be to introduce a mechanism to choose an alternate lease transfer target after some amount of time, if the lease transfer to the VOTER_INCOMING cannot be confirmed to be safe. We may do this in the future, but given the proximity to the release and given that this matches the behavior in v22.1, we choose this approach for now.

Release note: None

Release justification: Needed to resolve release blocker.

Fixes: #88667
See also #89564

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Oct 9, 2022
2 parents e06d228 + f29f178 commit 6e06a05
Show file tree
Hide file tree
Showing 18 changed files with 218 additions and 59 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,9 @@ func (b *Batch) adminUnsplit(splitKeyIn interface{}) {

// adminTransferLease is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
func (b *Batch) adminTransferLease(
key interface{}, target roachpb.StoreID, bypassSafetyChecks bool,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -744,7 +746,8 @@ func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Target: target,
Target: target,
BypassSafetyChecks: bypassSafetyChecks,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,18 @@ func (db *DB) AdminTransferLease(
ctx context.Context, key interface{}, target roachpb.StoreID,
) error {
b := &Batch{}
b.adminTransferLease(key, target)
b.adminTransferLease(key, target, false /* bypassSafetyChecks */)
return getOneErr(db.Run(ctx, b), b)
}

// AdminTransferLeaseBypassingSafetyChecks is like AdminTransferLease, but
// configures the lease transfer to bypass safety checks. See the comment on
// AdminTransferLeaseRequest.BypassSafetyChecks for details.
func (db *DB) AdminTransferLeaseBypassingSafetyChecks(
ctx context.Context, key interface{}, target roachpb.StoreID,
) error {
b := &Batch{}
b.adminTransferLease(key, target, true /* bypassSafetyChecks */)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/asim/storerebalancer/candidate_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func (sr *simulatorReplica) Stats() *replicastats.RatedSummary {
}

// AdminTransferLease transfers the LeaderLease to another replica.
func (sr *simulatorReplica) AdminTransferLease(ctx context.Context, target roachpb.StoreID) error {
func (sr *simulatorReplica) AdminTransferLease(
ctx context.Context, target roachpb.StoreID, bypassSafetyChecks bool,
) error {
if !sr.state.ValidTransfer(sr.repl.Range(), state.StoreID(target)) {
return errors.Errorf(
"unable to transfer lease for r%d to store %d, invalid transfer.",
Expand Down
105 changes: 102 additions & 3 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)

// TestStoreRangeLease verifies that regular ranges (not some special ones at
Expand Down Expand Up @@ -371,6 +373,100 @@ func TestTransferLeaseToVoterDemotingFails(t *testing.T) {
})
}

// TestTransferLeaseDuringJointConfigWithDeadIncomingVoter ensures that the
// lease transfer performed during a joint config replication change that is
// replacing the existing leaseholder does not get stuck even if the existing
// leaseholder cannot prove that the incoming leaseholder is caught up on its
// log. It does so by killing the incoming leaseholder before it receives the
// lease and ensuring that the range is able to exit the joint configuration.
//
// Currently, the range exits by bypassing safety checks during the lease
// transfer, sending the lease to the dead incoming voter, letting the lease
// expire, acquiring the lease on one of the non-demoting voters, and exiting.
// The details here may change in the future, but the goal of this test will
// not.
func TestTransferLeaseDuringJointConfigWithDeadIncomingVoter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// The lease request timeout depends on the Raft election timeout, so we set
// it low to get faster lease expiration (800 ms) and speed up the test.
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
raftCfg.RaftHeartbeatIntervalTicks = 1
raftCfg.RaftElectionTimeoutTicks = 2

knobs, ltk := makeReplicationTestKnobs()
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
RaftConfig: raftCfg,
Knobs: knobs,
},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
// Make sure n1 has the lease to start with.
err := tc.Server(0).DB().AdminTransferLease(ctx, key, tc.Target(0).StoreID)
require.NoError(t, err)
store0, repl0 := getFirstStoreReplica(t, tc.Server(0), key)

// The test proceeds as follows:
//
// - Send an AdminChangeReplicasRequest to remove n1 (leaseholder) and add n4
// - Stop the replication change after entering the joint configuration
// - Kill n4 and wait until n1 notices
// - Complete the replication change

// Enter joint config.
ltk.withStopAfterJointConfig(func() {
tc.RebalanceVoterOrFatal(ctx, t, key, tc.Target(0), tc.Target(3))
})
desc = tc.LookupRangeOrFatal(t, key)
require.Len(t, desc.Replicas().Descriptors(), 4)
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)

// Kill n4.
tc.StopServer(3)

// Wait for n1 to notice.
testutils.SucceedsSoon(t, func() error {
// Manually report n4 as unreachable to speed up the test.
require.NoError(t, repl0.RaftReportUnreachable(4))
// Check the Raft progress.
s := repl0.RaftStatus()
require.Equal(t, raft.StateLeader, s.RaftState)
p := s.Progress
require.Len(t, p, 4)
require.Contains(t, p, uint64(4))
if p[4].State != tracker.StateProbe {
return errors.Errorf("dead replica not state probe")
}
return nil
})

// Run the range through the replicate queue on n1.
trace, processErr, err := store0.Enqueue(
ctx, "replicate", repl0, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
expectedMessages := []string{
`transitioning out of joint configuration`,
`leaseholder .* is being removed through an atomic replication change, transferring lease to`,
`lease transfer to .* complete`,
}
require.NoError(t, testutils.MatchInOrder(formattedTrace, expectedMessages...))

// Verify that the joint configuration has completed.
desc = tc.LookupRangeOrFatal(t, key)
require.Len(t, desc.Replicas().VoterDescriptors(), 3)
require.False(t, desc.Replicas().InAtomicReplicationChange(), desc)
}

// internalTransferLeaseFailureDuringJointConfig reproduces
// https://github.com/cockroachdb/cockroach/issues/83687
// and makes sure that if lease transfer fails during a joint configuration
Expand Down Expand Up @@ -398,13 +494,16 @@ func internalTransferLeaseFailureDuringJointConfig(t *testing.T, isManual bool)
// range when they are being proposed.
var scratchRangeID int64
shouldFailProposal := func(args kvserverbase.ProposalFilterArgs) bool {
// Block if a ChangeReplicas command is removing a node from our range.
return args.Req.RangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) &&
args.Req.IsSingleTransferLeaseRequest()
}
const failureMsg = "injected lease transfer"
knobs.Store.(*kvserver.StoreTestingKnobs).TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *roachpb.Error {
if shouldFailProposal(args) {
return roachpb.NewErrorf("Injecting lease transfer failure")
// The lease transfer should be configured to bypass safety checks.
// See maybeTransferLeaseDuringLeaveJoint for an explanation.
require.True(t, args.Req.Requests[0].GetTransferLease().BypassSafetyChecks)
return roachpb.NewErrorf(failureMsg)
}
return nil
}
Expand All @@ -430,7 +529,7 @@ func internalTransferLeaseFailureDuringJointConfig(t *testing.T, isManual bool)
{ChangeType: roachpb.ADD_VOTER, Target: tc.Target(3)},
})
require.Error(t, err)
require.Regexp(t, "Injecting lease transfer failure", err)
require.Regexp(t, failureMsg, err)

// We're now in a joint configuration, n1 already revoked its lease but all
// other replicas think n1 is the leaseholder. As long as n1 is alive, it is
Expand Down
23 changes: 12 additions & 11 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
origLease, _ := l.replica0.GetLease()
{
// Transferring the lease to ourself should be a no-op.
if err := l.replica0.AdminTransferLease(ctx, l.replica0Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica0Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
newLease, _ := l.replica0.GetLease()
Expand All @@ -1601,12 +1601,12 @@ func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := l.replica0.AdminTransferLease(ctx, 1000); !testutils.IsError(err, expected) {
if err := l.replica0.AdminTransferLease(ctx, 1000, false /* bypassSafetyChecks */); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
}

if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1651,7 +1651,7 @@ func TestLeaseExpirationBasedRangeTransferWithExtension(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.tc.Stopper().Stop(ctx)
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)
Expand All @@ -1673,7 +1673,7 @@ func TestLeaseExpirationBasedRangeTransferWithExtension(t *testing.T) {
transferErrCh := make(chan error)
go func() {
// Transfer back from replica1 to replica0.
err := l.replica1.AdminTransferLease(context.Background(), l.replica0Desc.StoreID)
err := l.replica1.AdminTransferLease(context.Background(), l.replica0Desc.StoreID, false /* bypassSafetyChecks */)
// Ignore not leaseholder errors which can arise due to re-proposals.
if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) {
err = nil
Expand Down Expand Up @@ -1738,7 +1738,7 @@ func TestLeaseExpirationBasedDrainTransferWithExtension(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.tc.Stopper().Stop(ctx)
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)
Expand Down Expand Up @@ -1791,7 +1791,7 @@ func TestLeaseExpirationBelowFutureTimeRequest(t *testing.T) {

// Ensure that replica1 has the lease, and that replica0 has also picked up
// on the lease transfer.
require.NoError(t, l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID))
require.NoError(t, l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */))
l.checkHasLease(t, 1)
preLease, _ := l.replica1.GetLease()
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -2780,9 +2780,6 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
AllowLeaseTransfersWhenTargetMayNeedSnapshot: true,
},
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Expand Down Expand Up @@ -2872,7 +2869,11 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
// Finally, transfer the lease to node 2 while it is still unavailable and
// behind. We try to avoid this case when picking new leaseholders in practice,
// but we're never 100% successful.
tc.TransferRangeLeaseOrFatal(t, *repl0.Desc(), tc.Target(2))
// NOTE: we bypass safety checks because the target node is behind on its log,
// so the lease transfer would be rejected otherwise.
err = tc.Servers[0].DB().AdminTransferLeaseBypassingSafetyChecks(ctx,
repl0.Desc().StartKey.AsRawKey(), tc.Target(2).StoreID)
require.Nil(t, err)

// Remove the partition. A snapshot to node 2 should follow. This snapshot
// will inform node 2 that it is the new leaseholder for the range. Node 2
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,10 @@ func countNotLeaseHolderErrors(ba roachpb.BatchRequest, repls []*kvserver.Replic
const testingTargetDuration = 300 * time.Millisecond

const testingSideTransportInterval = 100 * time.Millisecond

// TODO(nvanbenschoten): this is a pretty bad variable name to leak into the
// global scope of the kvserver_test package. At least one test was using it
// unintentionally. Remove it.
const numNodes = 3

func replsForRange(
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ func (r *Replica) RaftUnlock() {
r.raftMu.Unlock()
}

func (r *Replica) RaftReportUnreachable(id roachpb.ReplicaID) error {
return r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ReportUnreachable(uint64(id))
return false /* unquiesceAndWakeLeader */, nil
})
}

// LastAssignedLeaseIndexRLocked returns the last assigned lease index.
func (r *Replica) LastAssignedLeaseIndex() uint64 {
r.mu.RLock()
Expand Down
42 changes: 28 additions & 14 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,20 +1196,22 @@ func (r *Replica) maybeTransferLeaseDuringLeaveJoint(
// the set of full or incoming voters that will remain after the joint configuration is
// complete. If we don't find the current leaseholder there this means it's being removed,
// and we're going to transfer the lease to another voter below, before exiting the JOINT config.
beingRemoved := true
voterIncomingTarget := roachpb.ReplicaDescriptor{}
for _, v := range voters {
if beingRemoved && v.ReplicaID == r.ReplicaID() {
beingRemoved = false
if v.ReplicaID == r.ReplicaID() {
// We are still a voter.
return nil
}
if voterIncomingTarget == (roachpb.ReplicaDescriptor{}) && v.Type == roachpb.VOTER_INCOMING {
voterIncomingTarget = v
}
}
if !beingRemoved {
return nil
}

// We are being removed as a voter.
voterDemotingTarget, err := r.GetReplicaDescriptor()
if err != nil {
return err
}
if voterIncomingTarget == (roachpb.ReplicaDescriptor{}) {
// Couldn't find a VOTER_INCOMING target. When the leaseholder is being
// removed, we only enter a JOINT config if there is a VOTER_INCOMING
Expand All @@ -1221,18 +1223,30 @@ func (r *Replica) maybeTransferLeaseDuringLeaveJoint(
// to continue trying to leave the JOINT config. If this is the case,
// our replica will not be able to leave the JOINT config, but the new
// leaseholder will be able to do so.
log.Infof(ctx, "no VOTER_INCOMING to transfer lease to. This replica probably lost the lease,"+
" but still thinks its the leaseholder. In this case the new leaseholder is expected to "+
log.Warningf(ctx, "no VOTER_INCOMING to transfer lease to. This replica probably lost the "+
"lease, but still thinks its the leaseholder. In this case the new leaseholder is expected to "+
"complete LEAVE_JOINT. Range descriptor: %v", desc)
return nil
}
log.VEventf(ctx, 5, "current leaseholder %v is being removed through an"+
" atomic replication change. Transferring lease to %v", r.String(), voterIncomingTarget)
err := r.store.DB().AdminTransferLease(ctx, r.startKey, voterIncomingTarget.StoreID)
log.VEventf(ctx, 2, "leaseholder %v is being removed through an atomic "+
"replication change, transferring lease to %v", voterDemotingTarget, voterIncomingTarget)
// We bypass safety checks when transferring the lease to the VOTER_INCOMING.
// We do so because we could get stuck without a path to exit the joint
// configuration if we rejected this lease transfer while waiting to confirm
// that the target is up-to-date on its log. That confirmation may never
// arrive if the target is dead or partitioned away, and while we'd rather not
// transfer the lease to a dead node, at least we have a mechanism to recovery
// from that state. We also just sent the VOTER_INCOMING a snapshot (as a
// LEARNER, before promotion), so it is unlikely that the replica is actually
// dead or behind on its log.
// TODO(nvanbenschoten): this isn't great. Instead of bypassing safety checks,
// we should build a mechanism to choose an alternate lease transfer target
// after some amount of time.
err = r.store.DB().AdminTransferLeaseBypassingSafetyChecks(ctx, r.startKey, voterIncomingTarget.StoreID)
if err != nil {
return err
}
log.VEventf(ctx, 5, "leaseholder transfer to %v complete", voterIncomingTarget)
log.VEventf(ctx, 2, "lease transfer to %v complete", voterIncomingTarget)
return nil
}

Expand All @@ -1251,7 +1265,7 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
return desc, nil
}
// NB: this is matched on in TestMergeQueueSeesLearner.
log.Eventf(ctx, "transitioning out of joint configuration %s", desc)
log.VEventf(ctx, 2, "transitioning out of joint configuration %s", desc)

// If the leaseholder is being demoted, leaving the joint config is only
// possible if we first transfer the lease. A range not being able to exit
Expand Down Expand Up @@ -3691,7 +3705,7 @@ func (r *Replica) adminScatter(
targetStoreID := potentialLeaseTargets[newLeaseholderIdx].StoreID
if targetStoreID != r.store.StoreID() {
log.VEventf(ctx, 2, "randomly transferring lease to s%d", targetStoreID)
if err := r.AdminTransferLease(ctx, targetStoreID); err != nil {
if err := r.AdminTransferLease(ctx, targetStoreID, false /* bypassSafetyChecks */); err != nil {
log.Warningf(ctx, "failed to scatter lease to s%d: %+v", targetStoreID, err)
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func newUnloadedReplica(
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter

Expand Down
Loading

0 comments on commit 6e06a05

Please sign in to comment.