From 732d90f2969fb1a5a0f718b0972354ec234f6e83 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Tue, 1 Mar 2022 16:13:17 -0500 Subject: [PATCH] kvserver: don't reject raft snapshots on draining nodes Previously, draining nodes were incorrectly rejecting all snapshots -- including Raft snapshots. This meant that the replicas on those draining nodes that needed Raft snapshots to catch up would never be able to do so. This could've lead to tacit unavailability where, even in cases where all the replicas are live, if a majority is on draining nodes, the range would be stalled. Discovered in https://github.com/cockroachlabs/support/issues/1459 Release justification: bug fix Release note (bug fix): Previously, draining nodes in a cluster without shutting them down could stall foreground traffic in the cluster. This patch fixes this bug. --- pkg/kv/kvserver/replica_learner_test.go | 78 ++++++++++++++++++++++--- pkg/kv/kvserver/store_raft.go | 7 --- pkg/kv/kvserver/store_snapshot.go | 21 +++++++ 3 files changed, 90 insertions(+), 16 deletions(-) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index e223735d34ea..4adcd04711a3 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -357,15 +358,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { }) } -// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica -// in need of a snapshot will receive one via the raft snapshot queue. This is -// also meant to test that a non-voting replica that is initialized via an -// `INITIAL` snapshot during its addition is not ignored by the raft snapshot -// queue for future snapshots. -func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - +func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { skip.UnderShort(t, "this test sleeps for a few seconds") var skipInitialSnapshot int64 @@ -386,6 +379,7 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { // Disable the raft snapshot queue, we will manually queue a replica into it // below. ltk.storeKnobs.DisableRaftSnapshotQueue = true + tc := testcluster.StartTestCluster( t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, @@ -405,6 +399,8 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { return err }) + // Wait until we remove the lock that prevents the raft snapshot queue from + // sending this replica a snapshot. select { case <-nonVoterSnapLockRemoved: case <-time.After(testutils.DefaultSucceedsSoonDuration): @@ -420,6 +416,15 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { time.Sleep(kvserver.RaftLogQueuePendingSnapshotGracePeriod) + if drainReceivingNode { + // Draining nodes shouldn't reject raft snapshots, so this should have no + // effect on the outcome of this test. + const drainingServerIdx = 1 + client, err := tc.GetAdminClient(ctx, t, drainingServerIdx) + require.NoError(t, err) + drain(ctx, t, client) + } + testutils.SucceedsSoon(t, func() error { // Manually enqueue the leaseholder replica into its store's raft snapshot // queue. We expect it to pick up on the fact that the non-voter on its range @@ -445,6 +450,61 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { require.NoError(t, g.Wait()) } +func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient) { + stream, err := client.Drain(ctx, &serverpb.DrainRequest{ + DoDrain: true, + }) + require.NoError(t, err) + + // Wait until the draining node acknowledges that it's draining. + _, err = stream.Recv() + require.NoError(t, err) +} + +// TestSnapshotsToDrainingNodes tests that rebalancing snapshots to draining +// receivers are rejected, but Raft snapshots aren't. +func TestSnapshotsToDrainingNodes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + t.Run("rebalancing snapshots", func(t *testing.T) { + ctx := context.Background() + + // We set up a 2 node test cluster with the second node marked draining. + const drainingServerIdx = 1 + const drainingNodeID = drainingServerIdx + 1 + tc := testcluster.StartTestCluster( + t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }, + ) + defer tc.Stopper().Stop(ctx) + client, err := tc.GetAdminClient(ctx, t, drainingServerIdx) + require.NoError(t, err) + drain(ctx, t, client) + + // Now, we try to add a replica to it, we expect that to fail. + scratchKey := tc.ScratchRange(t) + _, err = tc.AddVoters(scratchKey, makeReplicationTargets(drainingNodeID)...) + require.Regexp(t, "store is draining", err) + }) + + t.Run("raft snapshots", func(t *testing.T) { + testRaftSnapshotsToNonVoters(t, true /* drainReceivingNode */) + }) +} + +// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica +// in need of a snapshot will receive one via the raft snapshot queue. This is +// also meant to test that a non-voting replica that is initialized via an +// `INITIAL` snapshot during its addition is not ignored by the raft snapshot +// queue for future snapshots. +func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testRaftSnapshotsToNonVoters(t, false /* drainReceivingNode */) +} + func TestSplitWithLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 65b2eb89029f..494c66019a18 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -72,13 +72,6 @@ func (s *Store) HandleSnapshot( return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error { s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1) - if s.IsDraining() { - return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_DECLINED, - Message: storeDrainingMsg, - }) - } - return s.receiveSnapshot(ctx, header, stream) }) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index c31b68aba1ca..d588ce825a1f 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -699,6 +699,27 @@ func (s *Store) checkSnapshotOverlapLocked( func (s *Store) receiveSnapshot( ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + // Draining nodes will generally not be rebalanced to (see the filtering that + // happens in getStoreListFromIDsLocked()), but in case they are, they should + // reject the incoming rebalancing snapshots. + if s.IsDraining() { + switch t := header.Priority; t { + case SnapshotRequest_RECOVERY: + // We can not reject Raft snapshots because draining nodes may have + // replicas in `StateSnapshot` that need to catch up. + // + // TODO(aayush): We also do not reject snapshots sent to replace dead + // replicas here, but draining stores are still filtered out in + // getStoreListFromIDsLocked(). Is that sound? Don't we want to + // upreplicate to draining nodes if there are no other candidates? + case SnapshotRequest_REBALANCE: + return sendSnapshotError(stream, errors.New(storeDrainingMsg)) + default: + // If this a new snapshot type that this cockroach version does not know + // about, we let it through. + } + } + if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { return sendSnapshotError(stream, err)