diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index e223735d34ea..4adcd04711a3 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -357,15 +358,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) { }) } -// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica -// in need of a snapshot will receive one via the raft snapshot queue. This is -// also meant to test that a non-voting replica that is initialized via an -// `INITIAL` snapshot during its addition is not ignored by the raft snapshot -// queue for future snapshots. -func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - +func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { skip.UnderShort(t, "this test sleeps for a few seconds") var skipInitialSnapshot int64 @@ -386,6 +379,7 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { // Disable the raft snapshot queue, we will manually queue a replica into it // below. ltk.storeKnobs.DisableRaftSnapshotQueue = true + tc := testcluster.StartTestCluster( t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, @@ -405,6 +399,8 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { return err }) + // Wait until we remove the lock that prevents the raft snapshot queue from + // sending this replica a snapshot. select { case <-nonVoterSnapLockRemoved: case <-time.After(testutils.DefaultSucceedsSoonDuration): @@ -420,6 +416,15 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { time.Sleep(kvserver.RaftLogQueuePendingSnapshotGracePeriod) + if drainReceivingNode { + // Draining nodes shouldn't reject raft snapshots, so this should have no + // effect on the outcome of this test. + const drainingServerIdx = 1 + client, err := tc.GetAdminClient(ctx, t, drainingServerIdx) + require.NoError(t, err) + drain(ctx, t, client) + } + testutils.SucceedsSoon(t, func() error { // Manually enqueue the leaseholder replica into its store's raft snapshot // queue. We expect it to pick up on the fact that the non-voter on its range @@ -445,6 +450,61 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { require.NoError(t, g.Wait()) } +func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient) { + stream, err := client.Drain(ctx, &serverpb.DrainRequest{ + DoDrain: true, + }) + require.NoError(t, err) + + // Wait until the draining node acknowledges that it's draining. + _, err = stream.Recv() + require.NoError(t, err) +} + +// TestSnapshotsToDrainingNodes tests that rebalancing snapshots to draining +// receivers are rejected, but Raft snapshots aren't. +func TestSnapshotsToDrainingNodes(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + t.Run("rebalancing snapshots", func(t *testing.T) { + ctx := context.Background() + + // We set up a 2 node test cluster with the second node marked draining. + const drainingServerIdx = 1 + const drainingNodeID = drainingServerIdx + 1 + tc := testcluster.StartTestCluster( + t, 2, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }, + ) + defer tc.Stopper().Stop(ctx) + client, err := tc.GetAdminClient(ctx, t, drainingServerIdx) + require.NoError(t, err) + drain(ctx, t, client) + + // Now, we try to add a replica to it, we expect that to fail. + scratchKey := tc.ScratchRange(t) + _, err = tc.AddVoters(scratchKey, makeReplicationTargets(drainingNodeID)...) + require.Regexp(t, "store is draining", err) + }) + + t.Run("raft snapshots", func(t *testing.T) { + testRaftSnapshotsToNonVoters(t, true /* drainReceivingNode */) + }) +} + +// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica +// in need of a snapshot will receive one via the raft snapshot queue. This is +// also meant to test that a non-voting replica that is initialized via an +// `INITIAL` snapshot during its addition is not ignored by the raft snapshot +// queue for future snapshots. +func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + testRaftSnapshotsToNonVoters(t, false /* drainReceivingNode */) +} + func TestSplitWithLearnerOrJointConfig(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 65b2eb89029f..494c66019a18 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -72,13 +72,6 @@ func (s *Store) HandleSnapshot( return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error { s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1) - if s.IsDraining() { - return stream.Send(&SnapshotResponse{ - Status: SnapshotResponse_DECLINED, - Message: storeDrainingMsg, - }) - } - return s.receiveSnapshot(ctx, header, stream) }) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index c31b68aba1ca..d588ce825a1f 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -699,6 +699,27 @@ func (s *Store) checkSnapshotOverlapLocked( func (s *Store) receiveSnapshot( ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + // Draining nodes will generally not be rebalanced to (see the filtering that + // happens in getStoreListFromIDsLocked()), but in case they are, they should + // reject the incoming rebalancing snapshots. + if s.IsDraining() { + switch t := header.Priority; t { + case SnapshotRequest_RECOVERY: + // We can not reject Raft snapshots because draining nodes may have + // replicas in `StateSnapshot` that need to catch up. + // + // TODO(aayush): We also do not reject snapshots sent to replace dead + // replicas here, but draining stores are still filtered out in + // getStoreListFromIDsLocked(). Is that sound? Don't we want to + // upreplicate to draining nodes if there are no other candidates? + case SnapshotRequest_REBALANCE: + return sendSnapshotError(stream, errors.New(storeDrainingMsg)) + default: + // If this a new snapshot type that this cockroach version does not know + // about, we let it through. + } + } + if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil { if err := fn(header); err != nil { return sendSnapshotError(stream, err)