Skip to content

Commit

Permalink
kvserver: unskip lease preferences during outage
Browse files Browse the repository at this point in the history
Previously, `TestLeasePreferenceDuringOutage` would force replication
queue processing of the test range, then assert that the range
up-replicated and lease transferred to a preferred locality.

This test was skipped, and two of the assumptions it relied on to pass
were no longer true.

After cockroachdb#85219, the replicate queue no longer
re-processes replicas. Instead, the queue requeues replicas after
processing, at the appropriate priority. This broke the test due to the
replicate queue being disabled, making the re-queue a no-op.

After cockroachdb#94023, the replicate queue no longer looked for lease transfers,
after processing a replication action. Combined with cockroachdb#85219, the queue
would now be guaranteed to not process both up-replication and lease
transfers from a single enqueue.

Update the test to not require a manual process, instead using a queue
range filter, which allows tests which disable automatic replication, to
still process filtered ranges via the various replica queues. Also,
ensure that the non-stopped stores are considered live targets, after
simulating an outage (bumping manual clocks, stopping servers) -- so
that the expected up-replication, then lease transfer can proceed.

Fixes: cockroachdb#88769
Release note: None
  • Loading branch information
kvoli committed Sep 7, 2023
1 parent 0485b7e commit 65bc225
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 103 deletions.
231 changes: 132 additions & 99 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"math"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -909,15 +908,24 @@ func gossipLiveness(t *testing.T, tc *testcluster.TestCluster) {
}

// This test replicates the behavior observed in
// https://github.com/cockroachdb/cockroach/issues/62485. We verify that
// when a dc with the leaseholder is lost, a node in a dc that does not have the
// lease preference can steal the lease, upreplicate the range and then give up the
// lease in a single cycle of the replicate_queue.
// https://github.com/cockroachdb/cockroach/issues/62485. We verify that when a
// dc with the leaseholder is lost, a node in a dc that does not have the lease
// preference, can steal the lease, upreplicate the range and then give up the
// lease in a short period of time. Previously, the replicate queue would
// reprocess, instead of requeue replicas. This behavior changed in #85219, to
// prevent queue priority inversion. Subsequently, this test only asserts that
// the lease preferences are satisfied quickly, rather than in a single
// replicate queue process() call.
func TestLeasePreferencesDuringOutage(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 88769, "flaky test")
defer log.Scope(t).Close(t)

// This is a hefty test, so we skip it under short.
skip.UnderShort(t)
// The test has 5 nodes. Its possible in stress-race for nodes to be starved
// out heartbeating their liveness.
skip.UnderStressRace(t)

stickyRegistry := server.NewStickyVFSRegistry()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
Expand Down Expand Up @@ -947,15 +955,34 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
locality("us", "mi"),
locality("us", "mi"),
}
// Disable expiration based lease transfers. It is possible that a (pseudo)
// dead node acquires the lease and we are forced to wait out the expiration
// timer, if this were not set.

// This test disables the replicate queue. We wish to enable the replicate
// queue only for range we are testing, after marking some servers as dead.
// We also wait until the expected live stores are considered live from n1,
// if we didn't do this it would be possible for the range to process and be
// seen as unavailable due to manual clock jumps.
var testRangeID int64
var clockJumpMu syncutil.Mutex
atomic.StoreInt64(&testRangeID, -1)
disabledQueueBypassFn := func(rangeID roachpb.RangeID) bool {
if rangeID == roachpb.RangeID(atomic.LoadInt64(&testRangeID)) {
clockJumpMu.Lock()
defer clockJumpMu.Unlock()
return true
}
return false
}
settings := cluster.MakeTestingClusterSettings()
sv := &settings.SV
kvserver.TransferExpirationLeasesFirstEnabled.Override(ctx, sv, false)
kvserver.ExpirationLeasesOnly.Override(ctx, sv, false)
// The remaining live stores (n1,n4,n5) may become suspect due to manual
// clock jumps. Disable the suspect timer to prevent them becoming suspect
// when we bump the clocks.
liveness.TimeAfterNodeSuspect.Override(ctx, sv, 0)
timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(sv)

for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Settings: settings,
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
Expand All @@ -967,6 +994,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
// The Raft leadership may not end up on the eu node, but it needs to
// be able to acquire the lease anyway.
AllowLeaseRequestProposalsWhenNotLeader: true,
BaseQueueDisabledBypassFilter: disabledQueueBypassFn,
},
},
StoreSpecs: []base.StoreSpec{
Expand All @@ -992,105 +1020,110 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
require.NoError(t, tc.WaitForVoters(key, tc.Targets(1, 3)...))
tc.TransferRangeLeaseOrFatal(t, *repl.Desc(), tc.Target(1))

// Shutdown the sf datacenter, which is going to kill the node with the lease.
tc.StopServer(1)
tc.StopServer(2)

wait := func(duration time.Duration) {
manualClock.Increment(duration.Nanoseconds())
// Gossip and heartbeat all the live stores, we do this manually otherwise the
// allocator on server 0 may see everyone as temporarily dead due to the
// clock move above.
for _, i := range []int{0, 3, 4} {
require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness())
require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true))
func() {
// Lock the clockJumpMu, in order to prevent processing the test range before
// the intended stores are considered live from n1. If we didn't do this, it
// is possible for n1 to process the test range and find it unavailable
// (unactionable).
clockJumpMu.Lock()
defer clockJumpMu.Unlock()

// Enable queue processing of the test range, right before we stop the sf
// datacenter. We expect the test range to be enqueued into the replicate
// queue shortly after.
rangeID := repl.GetRangeID()
atomic.StoreInt64(&testRangeID, int64(rangeID))

// Shutdown the sf datacenter, which is going to kill the node with the lease.
tc.StopServer(1)
tc.StopServer(2)

wait := func(duration time.Duration) {
manualClock.Increment(duration.Nanoseconds())
// Gossip and heartbeat all the live stores, we do this manually otherwise the
// allocator on server 0 may see everyone as temporarily dead due to the
// clock move above.
for _, i := range []int{0, 3, 4} {
require.NoError(t, tc.Servers[i].HeartbeatNodeLiveness())
require.NoError(t, tc.GetFirstStoreFromServer(t, i).GossipStore(ctx, true))
}
}
}
// We need to wait until 2 and 3 are considered to be dead.
timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(&tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().Settings.SV)
wait(timeUntilNodeDead)

checkDead := func(store *kvserver.Store, storeIdx int) error {
if dead, timetoDie, err := store.GetStoreConfig().StorePool.IsDead(
tc.GetFirstStoreFromServer(t, storeIdx).StoreID()); err != nil || !dead {
// Sometimes a gossip update arrives right after server shutdown and
// after we manually moved the time, so move it again.
if err == nil {
wait(timetoDie)
wait(timeUntilNodeDead)

checkDead := func(store *kvserver.Store, storeIdx int) error {
if dead, timetoDie, err := store.GetStoreConfig().StorePool.IsDead(
tc.GetFirstStoreFromServer(t, storeIdx).StoreID()); err != nil || !dead {
// Sometimes a gossip update arrives right after server shutdown and
// after we manually moved the time, so move it again.
if err == nil {
wait(timetoDie)
}
// NB: errors.Wrapf(nil, ...) returns nil.
// nolint:errwrap
return errors.Errorf("expected server %d to be dead, instead err=%v, dead=%v", storeIdx, err, dead)
}
// NB: errors.Wrapf(nil, ...) returns nil.
// nolint:errwrap
return errors.Errorf("expected server %d to be dead, instead err=%v, dead=%v", storeIdx, err, dead)
return nil
}
return nil
}

testutils.SucceedsSoon(t, func() error {
store := tc.GetFirstStoreFromServer(t, 0)
sl, available, _ := store.GetStoreConfig().StorePool.TestingGetStoreList()
if available != 3 {
return errors.Errorf(
"expected all 3 remaining stores to be live, but only got %d, stores=%v",
available, sl)
}
if err := checkDead(store, 1); err != nil {
return err
}
if err := checkDead(store, 2); err != nil {
return err
}
return nil
})
}()

// Send a request to force lease acquisition on _some_ remaining live node.
// Note, we expect this to be n1 (server 0).
ba := &kvpb.BatchRequest{}
ba.Add(getArgs(key))
_, pErr := tc.Servers[0].DistSenderI().(kv.Sender).Send(ctx, ba)
require.Nil(t, pErr)

testutils.SucceedsSoon(t, func() error {
store := tc.GetFirstStoreFromServer(t, 0)
sl, _, _ := store.GetStoreConfig().StorePool.TestingGetStoreList()
if len(sl.TestingStores()) != 3 {
return errors.Errorf("expected all 3 remaining stores to be live, but only got %v",
sl.TestingStores())
}
if err := checkDead(store, 1); err != nil {
return err
// Validate that we upreplicated outside of SF. NB: This will occur prior
// to the lease preference being satisfied.
require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
serv, err := tc.FindMemberServer(replDesc.StoreID)
require.NoError(t, err)
servLocality := serv.Locality()
dc, ok := servLocality.Find("dc")
require.True(t, ok)
if dc == "sf" {
return errors.Errorf(
"expected no replicas in dc=sf, but found replica in "+
"dc=%s node_id=%v desc=%v",
dc, replDesc.NodeID, repl.Desc())
}
}
if err := checkDead(store, 2); err != nil {
// Validate that the lease also transferred to a preferred locality.
newLeaseHolder, err := tc.FindRangeLeaseHolder(*repl.Desc(), nil)
if err != nil {
return err
}
return nil
})
_, _, enqueueError := tc.GetFirstStoreFromServer(t, 0).
Enqueue(ctx, "replicate", repl, true /* skipShouldQueue */, false /* async */)

require.NoError(t, enqueueError, "failed to enqueue replica for replication")

var newLeaseHolder roachpb.ReplicationTarget
testutils.SucceedsSoon(t, func() error {
var err error
newLeaseHolder, err = tc.FindRangeLeaseHolder(*repl.Desc(), nil)
return err
})

srv, err := tc.FindMemberServer(newLeaseHolder.StoreID)
require.NoError(t, err)
loc := srv.Locality()
region, ok := loc.Find("region")
require.True(t, ok)
require.Equal(t, "us", region)
require.Equal(t, 3, len(repl.Desc().Replicas().Voters().VoterDescriptors()))
// Validate that we upreplicated outside of SF.
for _, replDesc := range repl.Desc().Replicas().Voters().VoterDescriptors() {
serv, err := tc.FindMemberServer(replDesc.StoreID)
serv, err := tc.FindMemberServer(newLeaseHolder.StoreID)
require.NoError(t, err)
memberLoc := serv.Locality()
dc, ok := memberLoc.Find("dc")
servLocality := serv.Locality()
region, ok := servLocality.Find("region")
require.True(t, ok)
require.NotEqual(t, "sf", dc)
}
history := repl.GetLeaseHistory()
// Make sure we see the eu node as a lease holder in the second to last
// leaseholder change.
// Since we can have expiration and epoch based leases at the tail of the
// history, we need to ignore them together if they originate from the same
// leaseholder.
nextNodeID := history[len(history)-1].Replica.NodeID
lastMove := len(history) - 2
for ; lastMove >= 0; lastMove-- {
if history[lastMove].Replica.NodeID != nextNodeID {
break
if region != "us" {
return errors.Errorf(
"expected leaseholder in region=us, but found region=%s node_id=%v",
region, newLeaseHolder.NodeID)
}
}
lastMove++
var leasesMsg []string
for _, h := range history {
leasesMsg = append(leasesMsg, h.String())
}
leaseHistory := strings.Join(leasesMsg, ", ")
require.Greater(t, lastMove, 0,
"must have at least one leaseholder change in history (lease history: %s)", leaseHistory)
require.Equal(t, tc.Target(0).NodeID, history[lastMove-1].Replica.NodeID,
"node id prior to last lease move (lease history: %s)", leaseHistory)
return nil
})
}

// This test verifies that when a node starts flapping its liveness, all leases
Expand Down
25 changes: 21 additions & 4 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,13 +655,24 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc.
}

bq.mu.Lock()
stopped := bq.mu.stopped || bq.mu.disabled
stopped := bq.mu.stopped
disabled := bq.mu.disabled
bq.mu.Unlock()

if stopped {
return
}

if disabled {
// The disabled queue bypass is used in tests which enable manual
// replication, however still require specific range(s) to be processed
// through the queue.
bypassDisabled := bq.store.TestingKnobs().BaseQueueDisabledBypassFilter
if bypassDisabled == nil || !bypassDisabled(repl.GetRangeID()) {
return
}
}

if !repl.IsInitialized() {
return
}
Expand Down Expand Up @@ -729,10 +740,16 @@ func (bq *baseQueue) addInternal(
}

if bq.mu.disabled {
if log.V(3) {
log.Infof(ctx, "queue disabled")
// The disabled queue bypass is used in tests which enable manual
// replication, however still require specific range(s) to be processed
// through the queue.
bypassDisabled := bq.store.TestingKnobs().BaseQueueDisabledBypassFilter
if bypassDisabled == nil || !bypassDisabled(desc.RangeID) {
if log.V(3) {
log.Infof(ctx, "queue disabled")
}
return false, errQueueDisabled
}
return false, errQueueDisabled
}

// If the replica is currently in purgatory, don't re-add it.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ type StoreTestingKnobs struct {
// required.
BaseQueueInterceptor func(ctx context.Context, bq *baseQueue)

// BaseQueueDisabledBypassFilter checks whether the replica for the given
// rangeID should ignore the queue being disabled, and be processed anyway.
BaseQueueDisabledBypassFilter func(rangeID roachpb.RangeID) bool

// InjectReproposalError injects an error in tryReproposeWithNewLeaseIndex.
// If nil is returned, reproposal will be attempted.
InjectReproposalError func(p *ProposalData) error
Expand Down

0 comments on commit 65bc225

Please sign in to comment.