From 88235266d97e7fb04a98668b9b3c733cba34407f Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Tue, 29 Oct 2024 17:16:39 +0000 Subject: [PATCH] kvserver: deflake TestSnapshotsToDrainingNodes This test was making tight assertions about the size of the snapshot that was sent. To do so, it was trying to reimplement the actual snapshot sending logic in `kvBatchSnapshotStrategy.Send()`. So these tight assertions weren't of much use -- they were asserting that we were correctly re-implementing `kvBatchSnapshotStrategy.Send()` in `getExpectedSnapshotSizeBytes`. We weren't, as evidenced by some rare flakes. This patch loosens assertions to deflake the test. Closes #133517 Release note: None --- pkg/kv/kvserver/replica_learner_test.go | 62 ++++++------------------- 1 file changed, 15 insertions(+), 47 deletions(-) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index a643f82635a8..a58a76972edb 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -922,19 +922,6 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // below. ltk.storeKnobs.DisableRaftSnapshotQueue = true - // Synchronize on the moment before the snapshot gets sent so we can measure - // the state at that time & gather metrics. - blockUntilSnapshotSendCh := make(chan struct{}) - blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSendSnapshotRequest) { - close(blockUntilSnapshotSendCh) - select { - case <-blockSnapshotSendCh: - case <-time.After(10 * time.Second): - return - } - } - tc := testcluster.StartTestCluster( t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: knobs}, @@ -1010,46 +997,27 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { return nil }) - // Wait until the snapshot is about to be sent before calculating what the - // snapshot size should be. This allows our snapshot measurement to account - // for any state changes that happen between calling AddNonVoters and the - // snapshot being sent. - <-blockUntilSnapshotSendCh - store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey) - snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl) - require.NoError(t, err) - - close(blockSnapshotSendCh) + // AddNonVoter will return after the snapshot is sent. Wait for it to do so + // before checking asserting on snapshot sent/received metrics. require.NoError(t, g.Wait()) - - // Record the snapshot metrics for the sender after the raft snapshot was sent. + // Record metrics. senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0, metrics) + receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1, metrics) - // Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been - // recorded and that it was not double counted in a different metric. + // Assert that the raft snapshot (aka recovery snapshot) bytes sent have been + // recorded and that they were not double counted in the rebalancing metric. senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) + require.Greater(t, senderMapDelta[".recovery"].sentBytes, int64(0)) + require.Equal(t, int64(0), senderMapDelta[".rebalancing"].sentBytes) + require.Equal(t, senderMapDelta[""], senderMapDelta[".recovery"]) - senderMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, - ".recovery": {sentBytes: snapshotLength, rcvdBytes: 0}, - "": {sentBytes: snapshotLength, rcvdBytes: 0}, - } - require.Equal(t, senderMapExpected, senderMapDelta) - - // Record the snapshot metrics for the receiver after the raft snapshot was - // received. - receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1, metrics) - - // Asserts that the raft snapshot (aka recovery snapshot) bytes received have - // been recorded and that it was not double counted in a different metric. + // Assert that the raft snapshot (aka recovery snapshot) bytes received have + // been recorded and that they were not double counted in the rebalancing + // metric. receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) - - receiverMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, - ".recovery": {sentBytes: 0, rcvdBytes: snapshotLength}, - "": {sentBytes: 0, rcvdBytes: snapshotLength}, - } - require.Equal(t, receiverMapExpected, receiverMapDelta) + require.Greater(t, receiverMapDelta[".recovery"].rcvdBytes, int64(0)) + require.Equal(t, int64(0), receiverMapDelta[".rebalancing"].rcvdBytes) + require.Equal(t, receiverMapDelta[""], receiverMapDelta[".recovery"]) } func drain(ctx context.Context, t *testing.T, client serverpb.AdminClient, drainingNodeID int) {