Skip to content

Commit

Permalink
kvserver: deflake TestSnapshotsToDrainingNodes
Browse files Browse the repository at this point in the history
This test was making tight assertions about the size of the snapshot
that was sent. To do so, it was trying to reimplement the actual
snapshot sending logic in `kvBatchSnapshotStrategy.Send()`. So these
tight assertions weren't of much use -- they were asserting that we
were correctly re-implementing `kvBatchSnapshotStrategy.Send()` in
`getExpectedSnapshotSizeBytes`. We weren't, as evidenced by some rare
flakes.

This patch loosens assertions to deflake the test.

Closes #133517
Release note: None
  • Loading branch information
arulajmani committed Oct 29, 2024
1 parent b15abb9 commit d006115
Showing 1 changed file with 15 additions and 47 deletions.
62 changes: 15 additions & 47 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,19 +924,6 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
// below.
ltk.storeKnobs.DisableRaftSnapshotQueue = true

// Synchronize on the moment before the snapshot gets sent so we can measure
// the state at that time & gather metrics.
blockUntilSnapshotSendCh := make(chan struct{})
blockSnapshotSendCh := make(chan struct{})
ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) {
close(blockUntilSnapshotSendCh)
select {
case <-blockSnapshotSendCh:
case <-time.After(10 * time.Second):
return
}
}

tc := testcluster.StartTestCluster(
t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
Expand Down Expand Up @@ -1012,46 +999,27 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
return nil
})

// Wait until the snapshot is about to be sent before calculating what the
// snapshot size should be. This allows our snapshot measurement to account
// for any state changes that happen between calling AddNonVoters and the
// snapshot being sent.
<-blockUntilSnapshotSendCh
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl)
require.NoError(t, err)

close(blockSnapshotSendCh)
// AddNonVoter will return after the snapshot is sent. Wait for it to do so
// before checking asserting on snapshot sent/received metrics.
require.NoError(t, g.Wait())

// Record the snapshot metrics for the sender after the raft snapshot was sent.
// Record metrics.
senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0, metrics)
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1, metrics)

// Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been
// recorded and that it was not double counted in a different metric.
// Assert that the raft snapshot (aka recovery snapshot) bytes sent have been
// recorded and that they were not double counted in the rebalancing metric.
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)
require.Greater(t, senderMapDelta[".recovery"].sentBytes, int64(0))
require.Equal(t, int64(0), senderMapDelta[".rebalancing"].sentBytes)
require.Equal(t, senderMapDelta[""], senderMapDelta[".recovery"])

senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)

// Record the snapshot metrics for the receiver after the raft snapshot was
// received.
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1, metrics)

// Asserts that the raft snapshot (aka recovery snapshot) bytes received have
// been recorded and that it was not double counted in a different metric.
// Assert that the raft snapshot (aka recovery snapshot) bytes received have
// been recorded and that they were not double counted in the rebalancing
// metric.
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
require.Greater(t, receiverMapDelta[".recovery"].rcvdBytes, int64(0))
require.Equal(t, int64(0), receiverMapDelta[".rebalancing"].rcvdBytes)
require.Equal(t, receiverMapDelta[""], receiverMapDelta[".recovery"])
}

func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient, drainingNodeID int) {
Expand Down

0 comments on commit d006115

Please sign in to comment.