Skip to content

Commit

Permalink
Merge pull request #77490 from aayushshah15/backport21.2-77246
Browse files Browse the repository at this point in the history
  • Loading branch information
aayushshah15 authored Mar 8, 2022
2 parents 08814ae + 732d90f commit 37dee54
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 16 deletions.
78 changes: 69 additions & 9 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -357,15 +358,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
})
}

// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica
// in need of a snapshot will receive one via the raft snapshot queue. This is
// also meant to test that a non-voting replica that is initialized via an
// `INITIAL` snapshot during its addition is not ignored by the raft snapshot
// queue for future snapshots.
func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
skip.UnderShort(t, "this test sleeps for a few seconds")

var skipInitialSnapshot int64
Expand All @@ -386,6 +379,7 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
// Disable the raft snapshot queue, we will manually queue a replica into it
// below.
ltk.storeKnobs.DisableRaftSnapshotQueue = true

tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
Expand All @@ -405,6 +399,8 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
return err
})

// Wait until we remove the lock that prevents the raft snapshot queue from
// sending this replica a snapshot.
select {
case <-nonVoterSnapLockRemoved:
case <-time.After(testutils.DefaultSucceedsSoonDuration):
Expand All @@ -420,6 +416,15 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {

time.Sleep(kvserver.RaftLogQueuePendingSnapshotGracePeriod)

if drainReceivingNode {
// Draining nodes shouldn't reject raft snapshots, so this should have no
// effect on the outcome of this test.
const drainingServerIdx = 1
client, err := tc.GetAdminClient(ctx, t, drainingServerIdx)
require.NoError(t, err)
drain(ctx, t, client)
}

testutils.SucceedsSoon(t, func() error {
// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
Expand All @@ -445,6 +450,61 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
require.NoError(t, g.Wait())
}

func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient) {
stream, err := client.Drain(ctx, &serverpb.DrainRequest{
DoDrain: true,
})
require.NoError(t, err)

// Wait until the draining node acknowledges that it's draining.
_, err = stream.Recv()
require.NoError(t, err)
}

// TestSnapshotsToDrainingNodes tests that rebalancing snapshots to draining
// receivers are rejected, but Raft snapshots aren't.
func TestSnapshotsToDrainingNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

t.Run("rebalancing snapshots", func(t *testing.T) {
ctx := context.Background()

// We set up a 2 node test cluster with the second node marked draining.
const drainingServerIdx = 1
const drainingNodeID = drainingServerIdx + 1
tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
},
)
defer tc.Stopper().Stop(ctx)
client, err := tc.GetAdminClient(ctx, t, drainingServerIdx)
require.NoError(t, err)
drain(ctx, t, client)

// Now, we try to add a replica to it, we expect that to fail.
scratchKey := tc.ScratchRange(t)
_, err = tc.AddVoters(scratchKey, makeReplicationTargets(drainingNodeID)...)
require.Regexp(t, "store is draining", err)
})

t.Run("raft snapshots", func(t *testing.T) {
testRaftSnapshotsToNonVoters(t, true /* drainReceivingNode */)
})
}

// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica
// in need of a snapshot will receive one via the raft snapshot queue. This is
// also meant to test that a non-voting replica that is initialized via an
// `INITIAL` snapshot during its addition is not ignored by the raft snapshot
// queue for future snapshots.
func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testRaftSnapshotsToNonVoters(t, false /* drainReceivingNode */)
}

func TestSplitWithLearnerOrJointConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ func (s *Store) HandleSnapshot(
return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error {
s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1)

if s.IsDraining() {
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_DECLINED,
Message: storeDrainingMsg,
})
}

return s.receiveSnapshot(ctx, header, stream)
})
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,27 @@ func (s *Store) checkSnapshotOverlapLocked(
func (s *Store) receiveSnapshot(
ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
// Draining nodes will generally not be rebalanced to (see the filtering that
// happens in getStoreListFromIDsLocked()), but in case they are, they should
// reject the incoming rebalancing snapshots.
if s.IsDraining() {
switch t := header.Priority; t {
case SnapshotRequest_RECOVERY:
// We can not reject Raft snapshots because draining nodes may have
// replicas in `StateSnapshot` that need to catch up.
//
// TODO(aayush): We also do not reject snapshots sent to replace dead
// replicas here, but draining stores are still filtered out in
// getStoreListFromIDsLocked(). Is that sound? Don't we want to
// upreplicate to draining nodes if there are no other candidates?
case SnapshotRequest_REBALANCE:
return sendSnapshotError(stream, errors.New(storeDrainingMsg))
default:
// If this a new snapshot type that this cockroach version does not know
// about, we let it through.
}
}

if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil {
if err := fn(header); err != nil {
return sendSnapshotError(stream, err)
Expand Down

0 comments on commit 37dee54

Please sign in to comment.