Skip to content

Commit

Permalink
Merge pull request #89621 from nvanbenschoten/backport22.2.0-89340
Browse files Browse the repository at this point in the history
release-22.2.0: kv: bypass lease transfer safety checks during joint consensus
  • Loading branch information
nvanbenschoten authored Oct 10, 2022
2 parents 776e9f8 + e6b99fd commit 6ba7a6b
Show file tree
Hide file tree
Showing 16 changed files with 213 additions and 56 deletions.
7 changes: 5 additions & 2 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,9 @@ func (b *Batch) adminUnsplit(splitKeyIn interface{}) {

// adminTransferLease is only exported on DB. It is here for symmetry with the
// other operations.
func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
func (b *Batch) adminTransferLease(
key interface{}, target roachpb.StoreID, bypassSafetyChecks bool,
) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -745,7 +747,8 @@ func (b *Batch) adminTransferLease(key interface{}, target roachpb.StoreID) {
RequestHeader: roachpb.RequestHeader{
Key: k,
},
Target: target,
Target: target,
BypassSafetyChecks: bypassSafetyChecks,
}
b.appendReqs(req)
b.initResult(1, 0, notRaw, nil)
Expand Down
13 changes: 12 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,18 @@ func (db *DB) AdminTransferLease(
ctx context.Context, key interface{}, target roachpb.StoreID,
) error {
b := &Batch{}
b.adminTransferLease(key, target)
b.adminTransferLease(key, target, false /* bypassSafetyChecks */)
return getOneErr(db.Run(ctx, b), b)
}

// AdminTransferLeaseBypassingSafetyChecks is like AdminTransferLease, but
// configures the lease transfer to bypass safety checks. See the comment on
// AdminTransferLeaseRequest.BypassSafetyChecks for details.
func (db *DB) AdminTransferLeaseBypassingSafetyChecks(
ctx context.Context, key interface{}, target roachpb.StoreID,
) error {
b := &Batch{}
b.adminTransferLease(key, target, true /* bypassSafetyChecks */)
return getOneErr(db.Run(ctx, b), b)
}

Expand Down
105 changes: 102 additions & 3 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/tracker"
)

// TestStoreRangeLease verifies that regular ranges (not some special ones at
Expand Down Expand Up @@ -371,6 +373,100 @@ func TestTransferLeaseToVoterDemotingFails(t *testing.T) {
})
}

// TestTransferLeaseDuringJointConfigWithDeadIncomingVoter ensures that the
// lease transfer performed during a joint config replication change that is
// replacing the existing leaseholder does not get stuck even if the existing
// leaseholder cannot prove that the incoming leaseholder is caught up on its
// log. It does so by killing the incoming leaseholder before it receives the
// lease and ensuring that the range is able to exit the joint configuration.
//
// Currently, the range exits by bypassing safety checks during the lease
// transfer, sending the lease to the dead incoming voter, letting the lease
// expire, acquiring the lease on one of the non-demoting voters, and exiting.
// The details here may change in the future, but the goal of this test will
// not.
func TestTransferLeaseDuringJointConfigWithDeadIncomingVoter(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

// The lease request timeout depends on the Raft election timeout, so we set
// it low to get faster lease expiration (800 ms) and speed up the test.
var raftCfg base.RaftConfig
raftCfg.SetDefaults()
raftCfg.RaftHeartbeatIntervalTicks = 1
raftCfg.RaftElectionTimeoutTicks = 2

knobs, ltk := makeReplicationTestKnobs()
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
RaftConfig: raftCfg,
Knobs: knobs,
},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, key, tc.Targets(1, 2)...)
// Make sure n1 has the lease to start with.
err := tc.Server(0).DB().AdminTransferLease(ctx, key, tc.Target(0).StoreID)
require.NoError(t, err)
store0, repl0 := getFirstStoreReplica(t, tc.Server(0), key)

// The test proceeds as follows:
//
// - Send an AdminChangeReplicasRequest to remove n1 (leaseholder) and add n4
// - Stop the replication change after entering the joint configuration
// - Kill n4 and wait until n1 notices
// - Complete the replication change

// Enter joint config.
ltk.withStopAfterJointConfig(func() {
tc.RebalanceVoterOrFatal(ctx, t, key, tc.Target(0), tc.Target(3))
})
desc = tc.LookupRangeOrFatal(t, key)
require.Len(t, desc.Replicas().Descriptors(), 4)
require.True(t, desc.Replicas().InAtomicReplicationChange(), desc)

// Kill n4.
tc.StopServer(3)

// Wait for n1 to notice.
testutils.SucceedsSoon(t, func() error {
// Manually report n4 as unreachable to speed up the test.
require.NoError(t, repl0.RaftReportUnreachable(4))
// Check the Raft progress.
s := repl0.RaftStatus()
require.Equal(t, raft.StateLeader, s.RaftState)
p := s.Progress
require.Len(t, p, 4)
require.Contains(t, p, uint64(4))
if p[4].State != tracker.StateProbe {
return errors.Errorf("dead replica not state probe")
}
return nil
})

// Run the range through the replicate queue on n1.
trace, processErr, err := store0.Enqueue(
ctx, "replicate", repl0, true /* skipShouldQueue */, false /* async */)
require.NoError(t, err)
require.NoError(t, processErr)
formattedTrace := trace.String()
expectedMessages := []string{
`transitioning out of joint configuration`,
`leaseholder .* is being removed through an atomic replication change, transferring lease to`,
`lease transfer to .* complete`,
}
require.NoError(t, testutils.MatchInOrder(formattedTrace, expectedMessages...))

// Verify that the joint configuration has completed.
desc = tc.LookupRangeOrFatal(t, key)
require.Len(t, desc.Replicas().VoterDescriptors(), 3)
require.False(t, desc.Replicas().InAtomicReplicationChange(), desc)
}

// internalTransferLeaseFailureDuringJointConfig reproduces
// https://github.com/cockroachdb/cockroach/issues/83687
// and makes sure that if lease transfer fails during a joint configuration
Expand Down Expand Up @@ -398,13 +494,16 @@ func internalTransferLeaseFailureDuringJointConfig(t *testing.T, isManual bool)
// range when they are being proposed.
var scratchRangeID int64
shouldFailProposal := func(args kvserverbase.ProposalFilterArgs) bool {
// Block if a ChangeReplicas command is removing a node from our range.
return args.Req.RangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) &&
args.Req.IsSingleTransferLeaseRequest()
}
const failureMsg = "injected lease transfer"
knobs.Store.(*kvserver.StoreTestingKnobs).TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *roachpb.Error {
if shouldFailProposal(args) {
return roachpb.NewErrorf("Injecting lease transfer failure")
// The lease transfer should be configured to bypass safety checks.
// See maybeTransferLeaseDuringLeaveJoint for an explanation.
require.True(t, args.Req.Requests[0].GetTransferLease().BypassSafetyChecks)
return roachpb.NewErrorf(failureMsg)
}
return nil
}
Expand All @@ -430,7 +529,7 @@ func internalTransferLeaseFailureDuringJointConfig(t *testing.T, isManual bool)
{ChangeType: roachpb.ADD_VOTER, Target: tc.Target(3)},
})
require.Error(t, err)
require.Regexp(t, "Injecting lease transfer failure", err)
require.Regexp(t, failureMsg, err)

// We're now in a joint configuration, n1 already revoked its lease but all
// other replicas think n1 is the leaseholder. As long as n1 is alive, it is
Expand Down
23 changes: 12 additions & 11 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,7 @@ func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
origLease, _ := l.replica0.GetLease()
{
// Transferring the lease to ourself should be a no-op.
if err := l.replica0.AdminTransferLease(ctx, l.replica0Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica0Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
newLease, _ := l.replica0.GetLease()
Expand All @@ -1601,12 +1601,12 @@ func TestLeaseExpirationBasedRangeTransfer(t *testing.T) {
{
// An invalid target should result in an error.
const expected = "unable to find store .* in range"
if err := l.replica0.AdminTransferLease(ctx, 1000); !testutils.IsError(err, expected) {
if err := l.replica0.AdminTransferLease(ctx, 1000, false /* bypassSafetyChecks */); !testutils.IsError(err, expected) {
t.Fatalf("expected %s, but found %v", expected, err)
}
}

if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1651,7 +1651,7 @@ func TestLeaseExpirationBasedRangeTransferWithExtension(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.tc.Stopper().Stop(ctx)
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)
Expand All @@ -1673,7 +1673,7 @@ func TestLeaseExpirationBasedRangeTransferWithExtension(t *testing.T) {
transferErrCh := make(chan error)
go func() {
// Transfer back from replica1 to replica0.
err := l.replica1.AdminTransferLease(context.Background(), l.replica0Desc.StoreID)
err := l.replica1.AdminTransferLease(context.Background(), l.replica0Desc.StoreID, false /* bypassSafetyChecks */)
// Ignore not leaseholder errors which can arise due to re-proposals.
if errors.HasType(err, (*roachpb.NotLeaseHolderError)(nil)) {
err = nil
Expand Down Expand Up @@ -1738,7 +1738,7 @@ func TestLeaseExpirationBasedDrainTransferWithExtension(t *testing.T) {
l := setupLeaseTransferTest(t)
defer l.tc.Stopper().Stop(ctx)
// Ensure that replica1 has the lease.
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID); err != nil {
if err := l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */); err != nil {
t.Fatal(err)
}
l.checkHasLease(t, 1)
Expand Down Expand Up @@ -1791,7 +1791,7 @@ func TestLeaseExpirationBelowFutureTimeRequest(t *testing.T) {

// Ensure that replica1 has the lease, and that replica0 has also picked up
// on the lease transfer.
require.NoError(t, l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID))
require.NoError(t, l.replica0.AdminTransferLease(ctx, l.replica1Desc.StoreID, false /* bypassSafetyChecks */))
l.checkHasLease(t, 1)
preLease, _ := l.replica1.GetLease()
require.Eventually(t, func() bool {
Expand Down Expand Up @@ -2780,9 +2780,6 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
AllowLeaseTransfersWhenTargetMayNeedSnapshot: true,
},
Server: &server.TestingKnobs{
WallClock: manualClock,
},
Expand Down Expand Up @@ -2872,7 +2869,11 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
// Finally, transfer the lease to node 2 while it is still unavailable and
// behind. We try to avoid this case when picking new leaseholders in practice,
// but we're never 100% successful.
tc.TransferRangeLeaseOrFatal(t, *repl0.Desc(), tc.Target(2))
// NOTE: we bypass safety checks because the target node is behind on its log,
// so the lease transfer would be rejected otherwise.
err = tc.Servers[0].DB().AdminTransferLeaseBypassingSafetyChecks(ctx,
repl0.Desc().StartKey.AsRawKey(), tc.Target(2).StoreID)
require.Nil(t, err)

// Remove the partition. A snapshot to node 2 should follow. This snapshot
// will inform node 2 that it is the new leaseholder for the range. Node 2
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,10 @@ func countNotLeaseHolderErrors(ba roachpb.BatchRequest, repls []*kvserver.Replic
const testingTargetDuration = 300 * time.Millisecond

const testingSideTransportInterval = 100 * time.Millisecond

// TODO(nvanbenschoten): this is a pretty bad variable name to leak into the
// global scope of the kvserver_test package. At least one test was using it
// unintentionally. Remove it.
const numNodes = 3

func replsForRange(
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ func (r *Replica) RaftUnlock() {
r.raftMu.Unlock()
}

func (r *Replica) RaftReportUnreachable(id roachpb.ReplicaID) error {
return r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ReportUnreachable(uint64(id))
return false /* unquiesceAndWakeLeader */, nil
})
}

// LastAssignedLeaseIndexRLocked returns the last assigned lease index.
func (r *Replica) LastAssignedLeaseIndex() uint64 {
r.mu.RLock()
Expand Down
42 changes: 28 additions & 14 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,20 +1196,22 @@ func (r *Replica) maybeTransferLeaseDuringLeaveJoint(
// the set of full or incoming voters that will remain after the joint configuration is
// complete. If we don't find the current leaseholder there this means it's being removed,
// and we're going to transfer the lease to another voter below, before exiting the JOINT config.
beingRemoved := true
voterIncomingTarget := roachpb.ReplicaDescriptor{}
for _, v := range voters {
if beingRemoved && v.ReplicaID == r.ReplicaID() {
beingRemoved = false
if v.ReplicaID == r.ReplicaID() {
// We are still a voter.
return nil
}
if voterIncomingTarget == (roachpb.ReplicaDescriptor{}) && v.Type == roachpb.VOTER_INCOMING {
voterIncomingTarget = v
}
}
if !beingRemoved {
return nil
}

// We are being removed as a voter.
voterDemotingTarget, err := r.GetReplicaDescriptor()
if err != nil {
return err
}
if voterIncomingTarget == (roachpb.ReplicaDescriptor{}) {
// Couldn't find a VOTER_INCOMING target. When the leaseholder is being
// removed, we only enter a JOINT config if there is a VOTER_INCOMING
Expand All @@ -1221,18 +1223,30 @@ func (r *Replica) maybeTransferLeaseDuringLeaveJoint(
// to continue trying to leave the JOINT config. If this is the case,
// our replica will not be able to leave the JOINT config, but the new
// leaseholder will be able to do so.
log.Infof(ctx, "no VOTER_INCOMING to transfer lease to. This replica probably lost the lease,"+
" but still thinks its the leaseholder. In this case the new leaseholder is expected to "+
log.Warningf(ctx, "no VOTER_INCOMING to transfer lease to. This replica probably lost the "+
"lease, but still thinks its the leaseholder. In this case the new leaseholder is expected to "+
"complete LEAVE_JOINT. Range descriptor: %v", desc)
return nil
}
log.VEventf(ctx, 5, "current leaseholder %v is being removed through an"+
" atomic replication change. Transferring lease to %v", r.String(), voterIncomingTarget)
err := r.store.DB().AdminTransferLease(ctx, r.startKey, voterIncomingTarget.StoreID)
log.VEventf(ctx, 2, "leaseholder %v is being removed through an atomic "+
"replication change, transferring lease to %v", voterDemotingTarget, voterIncomingTarget)
// We bypass safety checks when transferring the lease to the VOTER_INCOMING.
// We do so because we could get stuck without a path to exit the joint
// configuration if we rejected this lease transfer while waiting to confirm
// that the target is up-to-date on its log. That confirmation may never
// arrive if the target is dead or partitioned away, and while we'd rather not
// transfer the lease to a dead node, at least we have a mechanism to recovery
// from that state. We also just sent the VOTER_INCOMING a snapshot (as a
// LEARNER, before promotion), so it is unlikely that the replica is actually
// dead or behind on its log.
// TODO(nvanbenschoten): this isn't great. Instead of bypassing safety checks,
// we should build a mechanism to choose an alternate lease transfer target
// after some amount of time.
err = r.store.DB().AdminTransferLeaseBypassingSafetyChecks(ctx, r.startKey, voterIncomingTarget.StoreID)
if err != nil {
return err
}
log.VEventf(ctx, 5, "leaseholder transfer to %v complete", voterIncomingTarget)
log.VEventf(ctx, 2, "lease transfer to %v complete", voterIncomingTarget)
return nil
}

Expand All @@ -1251,7 +1265,7 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
return desc, nil
}
// NB: this is matched on in TestMergeQueueSeesLearner.
log.Eventf(ctx, "transitioning out of joint configuration %s", desc)
log.VEventf(ctx, 2, "transitioning out of joint configuration %s", desc)

// If the leaseholder is being demoted, leaving the joint config is only
// possible if we first transfer the lease. A range not being able to exit
Expand Down Expand Up @@ -3635,7 +3649,7 @@ func (r *Replica) adminScatter(
targetStoreID := potentialLeaseTargets[newLeaseholderIdx].StoreID
if targetStoreID != r.store.StoreID() {
log.VEventf(ctx, 2, "randomly transferring lease to s%d", targetStoreID)
if err := r.AdminTransferLease(ctx, targetStoreID); err != nil {
if err := r.AdminTransferLease(ctx, targetStoreID, false /* bypassSafetyChecks */); err != nil {
log.Warningf(ctx, "failed to scatter lease to s%d: %+v", targetStoreID, err)
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func newUnloadedReplica(
r.mu.checksums = map[uuid.UUID]*replicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r), tracker.NewLockfreeTracker(), r.Clock(), r.ClusterSettings())
r.mu.proposalBuf.testing.allowLeaseProposalWhenNotLeader = store.cfg.TestingKnobs.AllowLeaseRequestProposalsWhenNotLeader
r.mu.proposalBuf.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot = store.cfg.TestingKnobs.AllowLeaseTransfersWhenTargetMayNeedSnapshot
r.mu.proposalBuf.testing.dontCloseTimestamps = store.cfg.TestingKnobs.DontCloseTimestamps
r.mu.proposalBuf.testing.submitProposalFilter = store.cfg.TestingKnobs.TestingProposalSubmitFilter

Expand Down
6 changes: 1 addition & 5 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ type propBuf struct {
// heartbeats and then expect other replicas to take the lease without
// worrying about Raft).
allowLeaseProposalWhenNotLeader bool
// allowLeaseTransfersWhenTargetMayNeedSnapshot, if set, makes the proposal
// buffer allow lease request proposals even when the proposer cannot prove
// that the lease transfer target does not need a Raft snapshot.
allowLeaseTransfersWhenTargetMayNeedSnapshot bool
// dontCloseTimestamps inhibits the closing of timestamps.
dontCloseTimestamps bool
}
Expand Down Expand Up @@ -699,7 +695,7 @@ func (b *propBuf) maybeRejectUnsafeProposalLocked(
newLease := p.command.ReplicatedEvalResult.State.Lease
newLeaseTarget := newLease.Replica.ReplicaID
snapStatus := raftutil.ReplicaMayNeedSnapshot(&status, firstIndex, newLeaseTarget)
if snapStatus != raftutil.NoSnapshotNeeded && !b.testing.allowLeaseTransfersWhenTargetMayNeedSnapshot {
if snapStatus != raftutil.NoSnapshotNeeded && !p.Request.Requests[0].GetTransferLease().BypassSafetyChecks {
b.p.rejectProposalWithLeaseTransferRejectedLocked(ctx, p, newLease, snapStatus)
return true
}
Expand Down
Loading

0 comments on commit 6ba7a6b

Please sign in to comment.