Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: kvserver: don't reject raft snapshots on draining nodes #77490

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 69 additions & 9 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -357,15 +358,7 @@ func TestLearnerSnapshotFailsRollback(t *testing.T) {
})
}

// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica
// in need of a snapshot will receive one via the raft snapshot queue. This is
// also meant to test that a non-voting replica that is initialized via an
// `INITIAL` snapshot during its addition is not ignored by the raft snapshot
// queue for future snapshots.
func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
skip.UnderShort(t, "this test sleeps for a few seconds")

var skipInitialSnapshot int64
Expand All @@ -386,6 +379,7 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
// Disable the raft snapshot queue, we will manually queue a replica into it
// below.
ltk.storeKnobs.DisableRaftSnapshotQueue = true

tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
Expand All @@ -405,6 +399,8 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
return err
})

// Wait until we remove the lock that prevents the raft snapshot queue from
// sending this replica a snapshot.
select {
case <-nonVoterSnapLockRemoved:
case <-time.After(testutils.DefaultSucceedsSoonDuration):
Expand All @@ -420,6 +416,15 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {

time.Sleep(kvserver.RaftLogQueuePendingSnapshotGracePeriod)

if drainReceivingNode {
// Draining nodes shouldn't reject raft snapshots, so this should have no
// effect on the outcome of this test.
const drainingServerIdx = 1
client, err := tc.GetAdminClient(ctx, t, drainingServerIdx)
require.NoError(t, err)
drain(ctx, t, client)
}

testutils.SucceedsSoon(t, func() error {
// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
Expand All @@ -445,6 +450,61 @@ func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
require.NoError(t, g.Wait())
}

func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient) {
stream, err := client.Drain(ctx, &serverpb.DrainRequest{
DoDrain: true,
})
require.NoError(t, err)

// Wait until the draining node acknowledges that it's draining.
_, err = stream.Recv()
require.NoError(t, err)
}

// TestSnapshotsToDrainingNodes tests that rebalancing snapshots to draining
// receivers are rejected, but Raft snapshots aren't.
func TestSnapshotsToDrainingNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

t.Run("rebalancing snapshots", func(t *testing.T) {
ctx := context.Background()

// We set up a 2 node test cluster with the second node marked draining.
const drainingServerIdx = 1
const drainingNodeID = drainingServerIdx + 1
tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
},
)
defer tc.Stopper().Stop(ctx)
client, err := tc.GetAdminClient(ctx, t, drainingServerIdx)
require.NoError(t, err)
drain(ctx, t, client)

// Now, we try to add a replica to it, we expect that to fail.
scratchKey := tc.ScratchRange(t)
_, err = tc.AddVoters(scratchKey, makeReplicationTargets(drainingNodeID)...)
require.Regexp(t, "store is draining", err)
})

t.Run("raft snapshots", func(t *testing.T) {
testRaftSnapshotsToNonVoters(t, true /* drainReceivingNode */)
})
}

// TestNonVoterCatchesUpViaRaftSnapshotQueue ensures that a non-voting replica
// in need of a snapshot will receive one via the raft snapshot queue. This is
// also meant to test that a non-voting replica that is initialized via an
// `INITIAL` snapshot during its addition is not ignored by the raft snapshot
// queue for future snapshots.
func TestNonVoterCatchesUpViaRaftSnapshotQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
testRaftSnapshotsToNonVoters(t, false /* drainReceivingNode */)
}

func TestSplitWithLearnerOrJointConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ func (s *Store) HandleSnapshot(
return s.stopper.RunTaskWithErr(ctx, name, func(ctx context.Context) error {
s.metrics.RaftRcvdMessages[raftpb.MsgSnap].Inc(1)

if s.IsDraining() {
return stream.Send(&SnapshotResponse{
Status: SnapshotResponse_DECLINED,
Message: storeDrainingMsg,
})
}

return s.receiveSnapshot(ctx, header, stream)
})
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,27 @@ func (s *Store) checkSnapshotOverlapLocked(
func (s *Store) receiveSnapshot(
ctx context.Context, header *SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
// Draining nodes will generally not be rebalanced to (see the filtering that
// happens in getStoreListFromIDsLocked()), but in case they are, they should
// reject the incoming rebalancing snapshots.
if s.IsDraining() {
switch t := header.Priority; t {
case SnapshotRequest_RECOVERY:
// We can not reject Raft snapshots because draining nodes may have
// replicas in `StateSnapshot` that need to catch up.
//
// TODO(aayush): We also do not reject snapshots sent to replace dead
// replicas here, but draining stores are still filtered out in
// getStoreListFromIDsLocked(). Is that sound? Don't we want to
// upreplicate to draining nodes if there are no other candidates?
case SnapshotRequest_REBALANCE:
return sendSnapshotError(stream, errors.New(storeDrainingMsg))
default:
// If this a new snapshot type that this cockroach version does not know
// about, we let it through.
}
}

if fn := s.cfg.TestingKnobs.ReceiveSnapshot; fn != nil {
if err := fn(header); err != nil {
return sendSnapshotError(stream, err)
Expand Down