From 00f7b95b7b1bf0ffe9878e988b29f164abf64505 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Fri, 8 Nov 2024 12:53:41 +0900 Subject: [PATCH] fix: flaky TestPrepareCandidate --- .../framework/preemption/preemption_test.go | 101 ++++++++++++++---- 1 file changed, 81 insertions(+), 20 deletions(-) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index 22473be4f8fbc..b273cda8680e3 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -22,6 +22,7 @@ import ( "fmt" "reflect" "sort" + "strings" "sync" "testing" "time" @@ -441,12 +442,16 @@ func TestPrepareCandidate(t *testing.T) { ) tests := []struct { - name string - nodeNames []string - candidate *fakeCandidate - preemptor *v1.Pod - testPods []*v1.Pod - expectedDeletedPods []string + name string + nodeNames []string + candidate *fakeCandidate + preemptor *v1.Pod + testPods []*v1.Pod + // expectedDeletedPod is the pod name that is expected to be deleted. + // + // You can set multiple pod name if there're multiple possibilities. + // Both empty and "" means no pod is expected to be deleted. + expectedDeletedPod []string expectedDeletionError bool expectedPatchError bool // Only compared when async preemption is disabled. @@ -457,7 +462,6 @@ func TestPrepareCandidate(t *testing.T) { }{ { name: "no victims", - candidate: &fakeCandidate{ victims: &extenderv1.Victims{}, }, @@ -485,7 +489,7 @@ func TestPrepareCandidate(t *testing.T) { victim1, }, nodeNames: []string{node1Name}, - expectedDeletedPods: []string{"victim1"}, + expectedDeletedPod: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, @@ -505,7 +509,7 @@ func TestPrepareCandidate(t *testing.T) { victim1WithMatchingCondition, }, nodeNames: []string{node1Name}, - expectedDeletedPods: []string{"victim1"}, + expectedDeletedPod: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, @@ -523,7 +527,7 @@ func TestPrepareCandidate(t *testing.T) { preemptor: preemptor, testPods: []*v1.Pod{}, nodeNames: []string{node1Name}, - expectedDeletedPods: []string{"victim1"}, + expectedDeletedPod: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, @@ -560,7 +564,7 @@ func TestPrepareCandidate(t *testing.T) { preemptor: preemptor, testPods: []*v1.Pod{}, nodeNames: []string{node1Name}, - expectedDeletedPods: []string{"victim1"}, + expectedDeletedPod: []string{"victim1"}, expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, @@ -599,9 +603,14 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedPatchError: true, - expectedDeletedPods: []string{"victim2"}, + nodeNames: []string{node1Name}, + expectedPatchError: true, + expectedDeletedPod: []string{ + "victim2", + // The first victim could fail before the deletion of the second victim happens, + // which results in the second victim not being deleted. + "", + }, expectedStatus: framework.AsStatus(errors.New("patch pod status failed")), expectedPreemptingMap: sets.New(types.UID("preemptor")), expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, @@ -629,7 +638,6 @@ func TestPrepareCandidate(t *testing.T) { objs = append(objs, pod) } - requestStopper := make(chan struct{}) mu := &sync.RWMutex{} deletedPods := sets.New[string]() deletionFailure := false // whether any request to delete pod failed @@ -637,7 +645,6 @@ func TestPrepareCandidate(t *testing.T) { cs := clientsetfake.NewClientset(objs...) cs.PrependReactor("delete", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - <-requestStopper mu.Lock() defer mu.Unlock() name := action.(clienttesting.DeleteAction).GetName() @@ -651,7 +658,6 @@ func TestPrepareCandidate(t *testing.T) { }) cs.PrependReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) { - <-requestStopper mu.Lock() defer mu.Unlock() if action.(clienttesting.PatchAction).GetName() == "fail-victim" { @@ -664,6 +670,15 @@ func TestPrepareCandidate(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(cs, 0) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: cs.EventsV1()}) fakeActivator := &fakePodActivator{activatedPods: make(map[string]*v1.Pod), mu: mu} + + // Note: NominatedPodsForNode is called at the beginning of the goroutine in any case. + // fakePodNominator can delay the response of NominatedPodsForNode until the channel is closed, + // which allows us to test the preempting map before the goroutine does nothing yet. + requestStopper := make(chan struct{}) + nominator := &fakePodNominator{ + SchedulingQueue: internalqueue.NewSchedulingQueue(nil, informerFactory), + requestStopper: requestStopper, + } fwk, err := tf.NewFramework( ctx, registeredPlugins, "", @@ -672,7 +687,7 @@ func TestPrepareCandidate(t *testing.T) { frameworkruntime.WithInformerFactory(informerFactory), frameworkruntime.WithWaitingPods(frameworkruntime.NewWaitingPodsMap()), frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, nodes)), - frameworkruntime.WithPodNominator(internalqueue.NewSchedulingQueue(nil, informerFactory)), + frameworkruntime.WithPodNominator(nominator), frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, "test-scheduler")), frameworkruntime.WithPodActivator(fakeActivator), ) @@ -720,10 +735,15 @@ func TestPrepareCandidate(t *testing.T) { if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { mu.RLock() defer mu.RUnlock() - if !deletedPods.Equal(sets.New(tt.expectedDeletedPods...)) { - lastErrMsg = fmt.Sprintf("expected deleted pods %v, got %v", tt.expectedDeletedPods, deletedPods.UnsortedList()) + + pe.mu.Lock() + defer pe.mu.Unlock() + if len(pe.preempting) != 0 { + // The preempting map should be empty after the goroutine in all test cases. + lastErrMsg = fmt.Sprintf("expected no preempting pods, got %v", pe.preempting) return false, nil } + if tt.expectedDeletionError != deletionFailure { lastErrMsg = fmt.Sprintf("expected deletion error %v, got %v", tt.expectedDeletionError, deletionFailure) return false, nil @@ -744,6 +764,34 @@ func TestPrepareCandidate(t *testing.T) { } } + if deletedPods.Len() > 1 { + // For now, we only expect at most one pod to be deleted in all test cases. + // If we need to test multiple pods deletion, we need to update the test table definition. + return false, fmt.Errorf("expected at most one pod to be deleted, got %v", deletedPods.UnsortedList()) + } + + if len(tt.expectedDeletedPod) == 0 { + if deletedPods.Len() != 0 { + // When tt.expectedDeletedPod is empty, we expect no pod to be deleted. + return false, fmt.Errorf("expected no pod to be deleted, got %v", deletedPods.UnsortedList()) + } + // nothing further to check. + return true, nil + } + + found := false + for _, podName := range tt.expectedDeletedPod { + if deletedPods.Has(podName) || + // If podName is empty, we expect no pod to be deleted. + (deletedPods.Len() == 0 && podName == "") { + found = true + } + } + if !found { + lastErrMsg = fmt.Sprintf("expected pod %v to be deleted, but %v is deleted", strings.Join(tt.expectedDeletedPod, " or "), deletedPods.UnsortedList()) + return false, nil + } + return true, nil }); err != nil { t.Fatal(lastErrMsg) @@ -753,6 +801,19 @@ func TestPrepareCandidate(t *testing.T) { } } +type fakePodNominator struct { + // embed it so that we can only override NominatedPodsForNode + internalqueue.SchedulingQueue + + // fakePodNominator doesn't respond to NominatedPodsForNode() until the channel is closed. + requestStopper chan struct{} +} + +func (f *fakePodNominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo { + <-f.requestStopper + return nil +} + type fakeExtender struct { ignorable bool errProcessPreemption bool