Skip to content

Commit

Permalink
kvserver: lease transfer in JOINT configuration
Browse files Browse the repository at this point in the history
Previously:
1. Removing a leaseholder was not allowed.
2. A VOTER_INCOMING node wasn't able to accept the lease.

Because of (1), users needed to transfer the lease before removing
the leaseholder. Because of (2), when relocating a range from the
leaseholder A to a new node B, there was no possibility to transfer
the lease to B before it was fully added as VOTER. Adding it as a
voter, however, could degrade fault tolerance. For example, if A
and B are in region R1, C in region R2 and D in R3, and we had
(A, C, D), and now adding B to the cluster to replace A results in
the intermediate configuration (A, B, C, D) the failure of R1 would
make the cluster unavailable since no quorum can be established.
Since B can't be added before A is removed, the system would
transfer the lease out to C, remove A and add B, and then transfer
the lease again to B. This resulted a temporary migration of leases
out of their preferred region, imbalance of lease count and degraded
performance.

The PR fixes this, by (1) allowing removing the leaseholder, and
transferring the lease right before we exit the JOINT config. And (2),
allowing a VOTER_INCOMING to accept the lease.

Release note (performance improvement): Fixes a limitation which meant
that, upon adding a new node to the cluster, lease counts among existing
nodes could diverge until the new node was fully upreplicated.
  • Loading branch information
shralex committed Feb 18, 2022
1 parent f2a722f commit 208e2b4
Show file tree
Hide file tree
Showing 20 changed files with 593 additions and 238 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-68 set the active cluster version in the format '<major>.<minor>'
version version 21.2-70 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-68</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-70</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
8 changes: 7 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ const (
// preserving temporary indexes, and a post-backfill merging
// processing.
MVCCIndexBackfiller

// EnableLeaseHolderRemoval enables removing a leaseholder and transferring the lease
// during joint configuration, including to VOTER_INCOMING replicas.
EnableLeaseHolderRemoval
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -460,6 +462,10 @@ var versionsSingleton = keyedVersions{
Key: MVCCIndexBackfiller,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 68},
},
{
Key: EnableLeaseHolderRemoval,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 70},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions pkg/kv/kvserver/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2009,11 +2009,17 @@ func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool
// with an empty or nil `raftStatus` (as will be the case when its called by a
// replica that is not the raft leader), we pessimistically assume that
// `replicaID` may need a snapshot.
func replicaMayNeedSnapshot(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool {
func replicaMayNeedSnapshot(raftStatus *raft.Status, replica roachpb.ReplicaDescriptor) bool {
// When adding replicas, we only move them from LEARNER to VOTER_INCOMING after
// they applied the snapshot (see initializeRaftLearners and its use in
// changeReplicasImpl).
if replica.GetType() == roachpb.VOTER_INCOMING {
return false
}
if raftStatus == nil || len(raftStatus.Progress) == 0 {
return true
}
if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok {
if progress, ok := raftStatus.Progress[uint64(replica.ReplicaID)]; ok {
// We can only reasonably assume that the follower replica is not in need of
// a snapshot iff it is in `StateReplicate`. However, even this is racey
// because we can still possibly have an ill-timed log truncation between
Expand All @@ -2040,7 +2046,7 @@ func excludeReplicasInNeedOfSnapshots(

filled := 0
for _, repl := range replicas {
if replicaMayNeedSnapshot(raftStatus, repl.ReplicaID) {
if replicaMayNeedSnapshot(raftStatus, repl) {
log.VEventf(
ctx,
5,
Expand Down
212 changes: 186 additions & 26 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"math"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -594,6 +593,182 @@ func TestLeasePreferencesRebalance(t *testing.T) {
})
}

// Tests that when leaseholder is relocated, the lease can be transferred directly to a new node.
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderRelocatePreferred(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testLeaseholderRelocateInternal(t, "us")
}

// Tests that when leaseholder is relocated, the lease will transfer to a node in a preferred
// location, even if another node is being added.
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderRelocateNonPreferred(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testLeaseholderRelocateInternal(t, "eu")
}

// Tests that when leaseholder is relocated, the lease will transfer to some node,
// even if nodes in the preferred region aren't available.
// This verifies https://github.com/cockroachdb/cockroach/issues/67740
func TestLeaseholderRelocateNonExistent(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testLeaseholderRelocateInternal(t, "au")
}

// Tests that when leaseholder is relocated, the lease can be transferred directly to new node
func testLeaseholderRelocateInternal(t *testing.T, preferredRegion string) {
stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
zcfg := zonepb.DefaultZoneConfig()
zcfg.LeasePreferences = []zonepb.LeasePreference{
{
Constraints: []zonepb.Constraint{
{Type: zonepb.Constraint_REQUIRED, Key: "region", Value: preferredRegion},
},
},
}

serverArgs := make(map[int]base.TestServerArgs)
locality := func(region string) roachpb.Locality {
return roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: region},
},
}
}
localities := []roachpb.Locality{
locality("eu"),
locality("eu"),
locality("us"),
locality("us"),
locality("au"),
}

const numNodes = 4
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
ClockSource: manualClock.UnixNano,
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
},
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
},
},
}
}
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
})
defer tc.Stopper().Stop(ctx)

_, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())

// We start with having the range under test on (1,2,3).
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2)...)

// Make sure the lease is on 3
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(2))

// Check that the lease moved to 3.
leaseHolder, err := tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(2), leaseHolder)

gossipLiveness(t, tc)

testutils.SucceedsSoon(t, func() error {
// Relocate range 3 -> 4.
err = tc.Servers[2].DB().
AdminRelocateRange(
context.Background(), rhsDesc.StartKey.AsRawKey(),
tc.Targets(0, 1, 3), nil, false)
if err != nil {
require.True(t, kvserver.IsTransientLeaseholderError(err), "%v", err)
return err
}
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
if err != nil {
return err
}
if leaseHolder.Equal(tc.Target(2)) {
return errors.Errorf("Leaseholder didn't move.")
}
return nil
})

// The only node with "au" locality is down, the lease can move anywhere.
if preferredRegion == "au" {
return
}

// Make sure lease moved to the preferred region, if .
leaseHolder, err = tc.FindRangeLeaseHolder(rhsDesc, nil)
require.NoError(t, err)
require.Equal(t, locality(preferredRegion),
localities[leaseHolder.NodeID-1])

var leaseholderNodeId int
if preferredRegion == "us" {
require.Equal(t, tc.Target(3).NodeID,
leaseHolder.NodeID)
leaseholderNodeId = 3
} else {
if leaseHolder.NodeID == tc.Target(0).NodeID {
leaseholderNodeId = 0
} else {
require.Equal(t, tc.Target(1).NodeID,
leaseHolder.NodeID)
leaseholderNodeId = 1
}
}

// Double check that lease moved directly.
repl := tc.GetFirstStoreFromServer(t, leaseholderNodeId).
LookupReplica(roachpb.RKey(rhsDesc.StartKey.AsRawKey()))
history := repl.GetLeaseHistory()
require.Equal(t, leaseHolder.NodeID,
history[len(history)-1].Replica.NodeID)
require.Equal(t, tc.Target(2).NodeID,
history[len(history)-2].Replica.NodeID)
}

func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) {
for i := range tc.Servers {
testutils.SucceedsSoon(t, tc.Servers[i].HeartbeatNodeLiveness)
}
// Make sure that all store pools have seen liveness heartbeats from everyone.
testutils.SucceedsSoon(t, func() error {
for i := range tc.Servers {
for j := range tc.Servers {
live, err := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().
StorePool.IsLive(tc.Target(j).StoreID)
if err != nil {
return err
}
if !live {
return errors.Errorf("Server %d is suspect on server %d", j, i)
}
}
}
return nil
})
}

// This test replicates the behavior observed in
// https://github.com/cockroachdb/cockroach/issues/62485. We verify that
// when a dc with the leaseholder is lost, a node in a dc that does not have the
Expand Down Expand Up @@ -716,29 +891,10 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return nil
})

_, processError, enqueueError := tc.GetFirstStoreFromServer(t, 0).
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)

require.NoError(t, enqueueError)
if processError != nil {
log.Infof(ctx, "a US replica stole lease, manually moving it to the EU.")
if !strings.Contains(processError.Error(), "does not have the range lease") {
t.Fatal(processError)
}
// The us replica ended up stealing the lease, so we need to manually
// transfer the lease and then do another run through the replicate queue
// to move it to the us.
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(0))
testutils.SucceedsSoon(t, func() error {
if !repl.OwnsValidLease(ctx, tc.Servers[0].Clock().NowAsClockTimestamp()) {
return errors.Errorf("Expected lease to transfer to server 0")
}
return nil
})
_, processError, enqueueError = tc.GetFirstStoreFromServer(t, 0).
ManuallyEnqueue(ctx, "replicate", repl, true)
require.NoError(t, enqueueError)
require.NoError(t, processError)
}

var newLeaseHolder roachpb.ReplicationTarget
testutils.SucceedsSoon(t, func() error {
Expand All @@ -747,22 +903,26 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
return err
})

// Check that the leaseholder is in the US
srv, err := tc.FindMemberServer(newLeaseHolder.StoreID)
require.NoError(t, err)
region, ok := srv.Locality().Find("region")
require.True(t, ok)
require.Equal(t, "us", region)
require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))

// Validate that we upreplicated outside of SF.
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
replicas := repl.Desc().Replicas().Voters().VoterDescriptors()
require.Equal(t, 3, len(replicas))
for _, replDesc := range replicas {
serv, err := tc.FindMemberServer(replDesc.StoreID)
require.NoError(t, err)
dc, ok := serv.Locality().Find("dc")
require.True(t, ok)
require.NotEqual(t, "sf", dc)
}
history := repl.GetLeaseHistory()

// make sure we see the eu node as a lease holder in the second to last position.
history := repl.GetLeaseHistory()
require.Equal(t, tc.Target(0).NodeID, history[len(history)-2].Replica.NodeID)
}

Expand Down Expand Up @@ -841,7 +1001,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {

_, rhsDesc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())
tc.AddVotersOrFatal(t, rhsDesc.StartKey.AsRawKey(), tc.Targets(1, 2, 3)...)
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0), tc.Target(1))
tc.RemoveLeaseHolderOrFatal(t, rhsDesc, tc.Target(0))

startKeys := make([]roachpb.Key, 20)
startKeys[0] = rhsDesc.StartKey.AsRawKey()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestStoreMetrics(t *testing.T) {
// Verify stats after addition.
verifyStats(t, tc, 1, 2)
checkGauge(t, "store 0", tc.GetFirstStoreFromServer(t, 0).Metrics().ReplicaCount, initialCount+1)
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0), tc.Target(1))
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0))
testutils.SucceedsSoon(t, func() error {
_, err := tc.GetFirstStoreFromServer(t, 0).GetReplica(desc.RangeID)
if err == nil {
Expand Down
25 changes: 17 additions & 8 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3826,16 +3826,25 @@ func TestLeaseHolderRemoveSelf(t *testing.T) {
})
defer tc.Stopper().Stop(ctx)

leaseHolder := tc.GetFirstStoreFromServer(t, 0)
key := []byte("a")
tc.SplitRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))
_, desc := tc.SplitRangeOrFatal(t, bootstrap.TestingUserTableDataMin())
key := desc.StartKey.AsRawKey()
tc.AddVotersOrFatal(t, key, tc.Targets(1)...)

// Remove the replica from first store.
tc.RemoveLeaseHolderOrFatal(t, desc, tc.Target(0))

// Attempt to remove the replica from first store.
expectedErr := "invalid ChangeReplicasTrigger"
if _, err := tc.RemoveVoters(key, tc.Target(0)); !testutils.IsError(err, expectedErr) {
t.Fatalf("expected %q error trying to remove leaseholder replica; got %v", expectedErr, err)
// Check that lease moved to server 2.
leaseInfo := getLeaseInfoOrFatal(t, context.Background(), tc.Servers[1].DB(), key)
rangeDesc, err := tc.LookupRange(key)
if err != nil {
t.Fatal(err)
}
replica, ok := rangeDesc.GetReplicaDescriptor(tc.Servers[1].GetFirstStoreID())
if !ok {
t.Fatalf("expected to find replica in server 2")
}
require.Equal(t, leaseInfo.Lease.Replica, replica)
leaseHolder := tc.GetFirstStoreFromServer(t, 1)

// Expect that we can still successfully do a get on the range.
getArgs := getArgs(key)
Expand Down
Loading

0 comments on commit 208e2b4

Please sign in to comment.