Skip to content

Commit

Permalink
kvserver: lease transfer in JOINT configuration
Browse files Browse the repository at this point in the history
Previously:
1. Removing a leaseholder was not allowed.
2. A VOTER_INCOMING node wasn't able to accept the lease.

Because of (1), users needed to transfer the lease before removing
the leaseholder. Because of (2), when relocating a range from the
leaseholder A to a new node B, there was no possibility to transfer
the lease to B before it was fully added as VOTER. Adding it as a
voter, however, could degrade fault tolerance. For example, if A
and B are in region R1, C in region R2 and D in R3, and we had
(a, C, D), and now adding B to the cluster to replace A results in
the intermediate configuration (A, B, C, D) the failure of R1 would
make the cluster unavailable since no quorum can be established.
Since B can't be added before A is removed, the system would
transfer the lease out to C, remove A and add B, and then transfer
the lease again to B. This resulted a temporary migration of leases
out of their preferred region, imbalance of lease count and degraded
performance.

The PR fixes this, by (1) allowing removing the leaseholder, and
transferring the lease right before we exit the JOINT config. And (2),
allowing a VOTER_INCOMING to accept the lease.

Release note: None

Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
shralex committed Dec 20, 2021
1 parent 362bde3 commit 527263a
Show file tree
Hide file tree
Showing 17 changed files with 465 additions and 228 deletions.
8 changes: 7 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,13 @@ func (db *DB) AdminTransferLease(
) error {
b := &Batch{}
b.adminTransferLease(key, target)
return getOneErr(db.Run(ctx, b), b)
err := db.Run(ctx, b)
if err == nil {
log.Infof(ctx, "Transferring lease to StoreID %s succeeded", target.String())
} else {
log.Infof(ctx, "Transferring lease to StoreID %s failed with error: %s", target.String(), err)
}
return getOneErr(err, b)
}

// AdminChangeReplicas adds or removes a set of replicas for a range.
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,8 @@ func (a *Allocator) TransferLeaseTarget(
}
fallthrough
case decideWithoutStats:
if !a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) {
if !opts.disableLeaseCountConvergenceChecks &&
!a.shouldTransferLeaseForLeaseCountConvergence(ctx, sl, source, existing) {
return roachpb.ReplicaDescriptor{}
}
case shouldTransfer:
Expand Down
253 changes: 227 additions & 26 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"math"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -568,6 +567,223 @@ func TestLeasePreferencesRebalance(t *testing.T) {
})
}

// Tests that when leaseholder is relocated, the lease can be transferred directly to new node
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderLocalRelocate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
zcfg := zonepb.DefaultZoneConfig()
zcfg.LeasePreferences = []zonepb.LeasePreference{
{
Constraints: []zonepb.Constraint{
{Type: zonepb.Constraint_REQUIRED, Key: "region", Value: "us"},
},
},
}

serverArgs := make(map[int]base.TestServerArgs)
locality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: region},
},
}
}
localities := []roachpb.Locality{
locality("eu"),
locality("eu"),
locality("us"),
locality("us"),
}

const numStores = 4
const numNodes = 4
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
},
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
},
},
}
}
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

_, rhsDesc := tc.SplitRangeOrFatal(t, keys.UserTableDataMin)
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...)

// We start with having the range under test on (1,2,3).
db := tc.ServerConn(0)

// Manually move lease out of preference.
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(0))

// Check that the lease is on 1
leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(0), leaseHolder)

// Cause the lease to moved based on lease preferences (to the only us node, 3)
tc.GetFirstStoreFromServer(t, 0).SetReplicateQueueActive(true)
require.NoError(t, tc.GetFirstStoreFromServer(t, 0).ForceReplicationScanAndProcess())

// Check that the lease moved to 3
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(2), leaseHolder)

gossipLiveness(t, tc)

// Relocate range 3 -> 4.
_, err = db.Exec("ALTER RANGE " + rhsDesc.RangeID.String() + " RELOCATE FROM 3 TO 4")
require.NoError(t, err)

// Make sure lease moved to 4
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(3), leaseHolder)

// Double check that lease moved directly 3 -> 4
repl := tc.GetFirstStoreFromServer(t, 3).LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey()))
history := repl.GetLeaseHistory()
require.Equal(t, tc.Target(2).NodeID, history[len(history)-2].Replica.NodeID)
require.Equal(t, tc.Target(3).NodeID, history[len(history)-1].Replica.NodeID)
}

func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) {
for i := range tc.Servers {
testutils.SucceedsSoon(t, tc.Servers[i].HeartbeatNodeLiveness)
}
// Make sure that all store pools have seen liveness heartbeats from everyone.
testutils.SucceedsSoon(t, func() error {
for i := range tc.Servers {
for j := range tc.Servers {
live, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().StorePool.IsLive(tc.Target(j).StoreID)
if err != nil {
return err
}
if !live {
return errors.Errorf("Expected server %d to be suspect on server %d", j, i)
}
}
}
return nil
})
}

// Tests that when leaseholder is relocated, the lease is transferred directly to one of the
// preferred nodes. This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderLocalRelocateNonPreferred(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
zcfg := zonepb.DefaultZoneConfig()
zcfg.LeasePreferences = []zonepb.LeasePreference{
{
Constraints: []zonepb.Constraint{
{Type: zonepb.Constraint_REQUIRED, Key: "region", Value: "eu"},
},
},
}

serverArgs := make(map[int]base.TestServerArgs)
locality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: region},
},
}
}
localities := []roachpb.Locality{
locality("eu"),
locality("eu"),
locality("us"),
locality("us"),
}

const numStores = 4
const numNodes = 4
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
},
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
},
},
}
}
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

_, rhsDesc := tc.SplitRangeOrFatal(t, keys.UserTableDataMin)
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...)

// We start with having the range under test on (1,2,3).
db := tc.ServerConn(0)

// Manually move lease out of preference.
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(2))

// Check that the lease is on 1
leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(2), leaseHolder)

gossipLiveness(t, tc)

// Relocate range 3 -> 4.
_, err = db.Exec("ALTER RANGE " + rhsDesc.RangeID.String() + " RELOCATE FROM 3 TO 4")
require.NoError(t, err)

// Make sure lease moved to 0 or 1, not 4
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.True(t,
leaseHolder.Equal(tc.Target(0)) || leaseHolder.Equal(tc.Target(1)))

// Double check that lease moved directly
repl := tc.GetFirstStoreFromServer(t, 3).LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey()))
history := repl.GetLeaseHistory()
require.Equal(t, tc.Target(2).NodeID, history[len(history)-2].Replica.NodeID)
require.Equal(t, leaseHolder.NodeID, history[len(history)-1].Replica.NodeID)
}

// This test replicates the behavior observed in
// https://github.com/cockroachdb/cockroach/issues/62485. We verify that
// when a dc with the leaseholder is lost, a node in a dc that does not have the
Expand Down Expand Up @@ -686,29 +902,10 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return nil
})

_, processError, enqueueError := tc.GetFirstStoreFromServer(t, 0).
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)

require.NoError(t, enqueueError)
if processError != nil {
log.Infof(ctx, "a US replica stole lease, manually moving it to the EU.")
if !strings.Contains(processError.Error(), "does not have the range lease") {
t.Fatal(processError)
}
// The us replica ended up stealing the lease, so we need to manually
// transfer the lease and then do another run through the replicate queue
// to move it to the us.
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(0))
testutils.SucceedsSoon(t, func() error {
if !repl.OwnsValidLease(ctx, tc.Servers[0].Clock().NowAsClockTimestamp()) {
return errors.Errorf("Expected lease to transfer to server 0")
}
return nil
})
_, processError, enqueueError = tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)
require.NoError(t, enqueueError)
require.NoError(t, processError)
}

var newLeaseHolder roachpb.ReplicationTarget
testutils.SucceedsSoon(t, func() error {
Expand All @@ -717,22 +914,26 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return err
})

// Check that the leaseholder is in the US
srv, err := tc.FindMemberServer(newLeaseHolder.StoreID)
require.NoError(t, err)
region, ok := srv.Locality().Find("region")
require.True(t, ok)
require.Equal(t, "us", region)
require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))

// Validate that we upreplicated outside of SF.
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
replicas := repl.Desc().Replicas().Voters().VoterDescriptors()
require.Equal(t, 3, len(replicas))
for _, replDesc := range replicas {
serv, err := tc.FindMemberServer(replDesc.StoreID)
require.NoError(t, err)
dc, ok := serv.Locality().Find("dc")
require.True(t, ok)
require.NotEqual(t, "sf", dc)
}
history := repl.GetLeaseHistory()

// make sure we see the eu node as a lease holder in the second to last position.
history := repl.GetLeaseHistory()
require.Equal(t, tc.Target(0).NodeID, history[len(history)-2].Replica.NodeID)
}

Expand Down Expand Up @@ -811,7 +1012,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {

_, rhsDesc := tc.SplitRangeOrFatal(t, keys.UserTableDataMin)
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2, 3)...)
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0), tc.Target(1))
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0))

startKeys := make([]roachpb.Key, 20)
startKeys[0] = rhsDesc.StartKey.AsRawKey()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestStoreMetrics(t *testing.T) {
// Verify stats after addition.
verifyStats(t, tc, 1, 2)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1)
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0), tc.Target(1))
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0))
testutils.SucceedsSoon(t, func() error {
_, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
if err == nil {
Expand Down
21 changes: 17 additions & 4 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3800,6 +3800,7 @@ func TestReplicateReAddAfterDown(t *testing.T) {
tc.WaitForValues(t, key, []int64{16, 16, 16})
}


// TestLeaseHolderRemoveSelf verifies that a lease holder cannot remove itself
// without encountering an error.
func TestLeaseHolderRemoveSelf(t *testing.T) {
Expand All @@ -3818,11 +3819,22 @@ func TestLeaseHolderRemoveSelf(t *testing.T) {
tc.SplitRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

// Attempt to remove the replica from first store.
expectedErr := "invalid ChangeReplicasTrigger"
if _, err := tc.RemoveVoters(key, tc.Target(0)); !testutils.IsError(err, expectedErr) {
t.Fatalf("expected %q error trying to remove leaseholder replica; got %v", expectedErr, err)
// Remove the replica from first store.
_, err := tc.RemoveVoters(key, tc.Target(0))
require.NoError(t, err)

// Check that lease moved to server 2
leaseInfo := getLeaseInfoOrFatal(t, context.Background(), tc.Servers[1].DB(), key)
rangeDesc, err := tc.LookupRange(key)
if err != nil {
t.Fatal(err)
}
replica, ok := rangeDesc.GetReplicaDescriptor(tc.Servers[1].GetFirstStoreID())
if !ok {
t.Fatalf("expected to find replica in server 2")
}
require.Equal(t, leaseInfo.Lease.Replica, replica)
leaseHolder = tc.GetFirstStoreFromServer(t, 1)

// Expect that we can still successfully do a get on the range.
getArgs := getArgs(key)
Expand All @@ -3832,6 +3844,7 @@ func TestLeaseHolderRemoveSelf(t *testing.T) {
}
}


// TestRemovedReplicaError verifies that a replica that has been removed from a
// range returns a RangeNotFoundError if it receives a request for that range
// (not RaftGroupDeletedError, and even before the ReplicaGCQueue has run).
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,10 @@ func TestAdminRelocateRange(t *testing.T) {
relocateAndCheck(t, tc, k, tc.Targets(4), nil /* nonVoterTargets */)
})
}

// s5 (LH) ---> s3 (LH)
// Lateral movement while at replication factor one. In this case atomic
// replication changes cannot be used; we add-then-remove instead.
// Lateral movement while at replication factor one.
{
requireNumAtomic(0, 2, func() {
requireNumAtomic(1, 0, func() {
relocateAndCheck(t, tc, k, tc.Targets(2), nil /* nonVoterTargets */)
})
}
Expand Down
Loading

0 comments on commit 527263a

Please sign in to comment.