Skip to content

Commit

Permalink
kv: additional testing for snapshot priorities
Browse files Browse the repository at this point in the history
Add testing for snapshot priorities. The test is currently disabled due
to issue cockroachdb#87553, however it is useful to run this test manually for now
and once that issue is fixed this test will make sure that we don't have
regressions related to snapshot priorities.

Release note: None
Epic: none
  • Loading branch information
andrewbaptist committed Jan 24, 2023
1 parent b21379b commit 4093981
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3316,7 +3316,7 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
atomic.StoreInt32(&skipSnaps, 0)
}
knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReplicaSkipInitialSnapshot: func() bool {
ReplicaSkipInitialSnapshot: func(desc roachpb.ReplicaDescriptor) bool {
return atomic.LoadInt32(&skipSnaps) != 0
},
RaftSnapshotQueueSkipReplica: func() bool {
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1804,7 +1804,7 @@ func (r *Replica) initializeRaftLearners(
return nil, errors.Errorf("programming error: cannot promote replica of type %s", rDesc.Type)
}

if fn := r.store.cfg.TestingKnobs.ReplicaSkipInitialSnapshot; fn != nil && fn() {
if fn := r.store.cfg.TestingKnobs.ReplicaSkipInitialSnapshot; fn != nil && fn(rDesc) {
continue
}

Expand Down Expand Up @@ -2791,7 +2791,11 @@ func (r *Replica) followerSendSnapshot(

// Throttle snapshot sending.
rangeSize := r.GetMVCCStats().Total()
cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize)
cleanup, err := r.store.throttleSnapshot(ctx, true,
req.SenderQueueName, req.SenderQueuePriority,
rangeSize,
req.RangeID, req.DelegatedSender.ReplicaID,
)
if err != nil {
return err
}
Expand Down
222 changes: 215 additions & 7 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -256,12 +257,15 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) {
return nil
}
ltk.storeKnobs.ThrottleEmptySnapshots = true
ltk.storeKnobs.BeforeSendSnapshotThrottle = func() {
atomic.AddInt64(&count, 1)
}
ltk.storeKnobs.AfterSendSnapshotThrottle = func() {
atomic.AddInt64(&count, -1)
}
ltk.storeKnobs.BeforeSnapshotThrottle =
func(isSend bool) {
atomic.AddInt64(&count, 1)
}
ltk.storeKnobs.AfterSnapshotThrottle =
func(isSend bool, storeId roachpb.StoreID, requestSource kvserverpb.SnapshotRequest_QueueName) error {
atomic.AddInt64(&count, -1)
return nil
}
ctx := context.Background()
tc := testcluster.StartTestCluster(
t, 3, base.TestClusterArgs{
Expand Down Expand Up @@ -485,7 +489,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {

// Set it up such that the newly added non-voter will not receive its INITIAL
// snapshot.
ltk.storeKnobs.ReplicaSkipInitialSnapshot = func() bool {
ltk.storeKnobs.ReplicaSkipInitialSnapshot = func(descriptor roachpb.ReplicaDescriptor) bool {
return atomic.LoadInt64(&skipInitialSnapshot) == 1
}
// Synchronize with the removal of the "best effort" lock on log truncation.
Expand Down Expand Up @@ -1906,3 +1910,207 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
require.Equal(t, receiverTotalExpected, receiverTotalDelta)
require.Equal(t, receiverMapExpected, receiverMapDelta)
}

// Send different snapshots to the same recipient with different queues classes.
type SnapshotPriorityStateMu struct {
syncutil.Mutex
initialized bool
numSnapshots int
keys [2]roachpb.Key
desc [2]roachpb.RangeDescriptor
leaseholderRepl [2]*kvserver.Replica
}

func TestSnapshotPriorityAndCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 87553)

var stateMu SnapshotPriorityStateMu

var storeKnobs kvserver.StoreTestingKnobs
ctx := context.Background()

// Make sure all snapshots are throttled.
storeKnobs.ThrottleEmptySnapshots = true
// Disable the raft snapshot queue, we will manually queue a replica into it
// below to control ordering.
storeKnobs.DisableRaftSnapshotQueue = true

// Set up replica creation such that a newly added NON-voter will not receive
// its INITIAL snapshot.
storeKnobs.ReplicaSkipInitialSnapshot = func(descriptor roachpb.ReplicaDescriptor) bool {
stateMu.Lock()
defer stateMu.Unlock()
// Don't do any enforcement until stateMu is set up.
if !stateMu.initialized {
return false
}

// We only block the snapshot for non-voters since this is only done once
// during the test to make it easier to control.
if descriptor.Type == roachpb.NON_VOTER {
fmt.Println("non voter received: ", descriptor.StoreID, descriptor.ReplicaID)
return true
}
fmt.Println("voter: ", descriptor.StoreID, descriptor.ReplicaID)
return false
}

storeKnobs.AfterSnapshotThrottle =
func(isSend bool, storeId roachpb.StoreID, requestSource kvserverpb.SnapshotRequest_QueueName) error {
stateMu.Lock()
// Don't throttle any snapshots until the test is ready to run.
if !stateMu.initialized {
stateMu.Unlock()
return nil
}

stateMu.numSnapshots++
stateMu.Unlock()

// Only block receives for this test.
if isSend {
return nil
}
// Only block on storeId 2.
if storeId != 2 {
return nil
}

if requestSource == kvserverpb.SnapshotRequest_OTHER {
return errors.New("Simulate receive failed")
}
return nil
}

tc := testcluster.StartTestCluster(
t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}},
ReplicationMode: base.ReplicationManual,
},
)
defer tc.Stopper().Stop(ctx)

// Send the snapshots through 3 different mechanisms.
// * 0 = Raft snapshot queue
// * 1 = Replica queue
// * 2 = DB protocol (fail)
leaseholderStore := tc.GetFirstStoreFromServer(t, 0)
key := tc.ScratchRange(t)

stateMu.Lock()
stateMu.initialized = true
for i := 0; i < len(stateMu.keys); i++ {
stateMu.keys[i] = key
stateMu.desc[i] = tc.LookupRangeOrFatal(t, key)
key = key.Next()
_, _ = tc.SplitRangeOrFatal(t, key)

var err error
stateMu.leaseholderRepl[i], err = leaseholderStore.GetReplica(stateMu.desc[i].RangeID)
require.NoError(t, err)
}
stateMu.Unlock()

g, ctx := errgroup.WithContext(ctx)

// Add a non-voting replica with initialization blocked. Note that
// `tc.AddNonVoters` will not return until the newly added non-voter is
// initialized, which we will do below via the raft snapshot queue.
g.Go(func() error {
_, err := tc.AddNonVoters(stateMu.keys[0], tc.Target(1))
return err
})
// Send replica 1 to store 2, so the replicate queue will try and add store 1.
// The replicate queue won't create an even number of voters so it is
// necessary to go through this extra step. This succeeds without blocking.
rangeDesc, err := tc.AddVoters(stateMu.keys[1], tc.Target(2))
require.NoError(t, err)
stateMu.Lock()
stateMu.desc[1] = rangeDesc
stateMu.Unlock()

// Send these two first, since after the AdminChangeReplica the
sendRaftQueueSnapshot(t, ctx, 0, &stateMu, leaseholderStore)
sendReplicateQueueSnapshot(t, ctx, 1, &stateMu, leaseholderStore)

// This will fail with an error - make sure at the end permits are still OK.
// This needs to call AdminChangeReplicas directly to not block waiting for
// AddVoter.
/*
_, err = tc.Servers[0].DB().AdminChangeReplicas(
ctx, stateMu.keys[2], stateMu.desc[2],
roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)))
require.Error(t, err)
*/

require.NoError(t, g.Wait())

// Make sure that we freed all reservations as expected.
for i := 0; i < 3; i++ {
require.Equal(t, 0, tc.GetFirstStoreFromServer(t, i).ReservationCount())
}
// require.Fail(t, "force logs")

}

// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
// needs a snapshot.
func sendRaftQueueSnapshot(
t *testing.T,
ctx context.Context,
idx int,
stateMu *SnapshotPriorityStateMu,
leaseholderStore *kvserver.Store,
) {
testutils.SucceedsSoon(t, func() error {
stateMu.Lock()
repl := stateMu.leaseholderRepl[idx]
stateMu.Unlock()
recording, pErr, err := leaseholderStore.Enqueue(
ctx,
"raftsnapshot",
repl,
false,
false,
)
require.NoError(t, pErr)
require.NoError(t, err)
matched, err := regexp.MatchString("streamed VIA_SNAPSHOT_QUEUE snapshot.*to.*NON_VOTER", recording.String())
if err != nil {
return err
}
if !matched {
return errors.Errorf("the raft snapshot queue did not send a snapshot to the non-voter")
}
return nil
})
}

// Manually enqueue the leaseholder replica into its store's raft snapshot
// queue. We expect it to pick up on the fact that the non-voter on its range
// needs a snapshot.
func sendReplicateQueueSnapshot(
t *testing.T,
ctx context.Context,
idx int,
stateMu *SnapshotPriorityStateMu,
leaseholderStore *kvserver.Store,
) {
testutils.SucceedsSoon(t, func() error {
stateMu.Lock()
repl := stateMu.leaseholderRepl[idx]
stateMu.Unlock()
_, pErr, err := leaseholderStore.Enqueue(
ctx,
"replicate",
repl,
false,
false,
)
require.NoError(t, pErr)
require.NoError(t, err)
return nil
})
}
Loading

0 comments on commit 4093981

Please sign in to comment.