Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize ingressip controller unit test #20352

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 36 additions & 64 deletions pkg/service/controller/ingressip/service_ingressip_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
kcoreclient "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -44,30 +44,6 @@ func newController(t *testing.T, client *fake.Clientset, stopCh <-chan struct{})
return controller
}

func controllerSetup(t *testing.T, startingObjects []runtime.Object, stopCh <-chan struct{}) (*fake.Clientset, *watch.FakeWatcher, *IngressIPController) {
client := fake.NewSimpleClientset(startingObjects...)

fakeWatch := watch.NewFake()
client.PrependWatchReactor("*", clientgotesting.DefaultWatchReactor(fakeWatch, nil))

client.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(clientgotesting.CreateAction).GetObject()
fakeWatch.Add(obj)
return true, obj, nil
})

// Ensure that updates the controller makes are passed through to the watcher.
client.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(clientgotesting.CreateAction).GetObject()
fakeWatch.Modify(obj)
return true, obj, nil
})

controller := newController(t, client, stopCh)

return client, fakeWatch, controller
}

func newService(name, ip string, typeLoadBalancer bool) *v1.Service {
serviceType := v1.ServiceTypeClusterIP
if typeLoadBalancer {
Expand Down Expand Up @@ -599,51 +575,47 @@ func TestBasicControllerFlow(t *testing.T) {
newService("lb-unallocated", "", true),
}

stopChannel := make(chan struct{})
defer close(stopChannel)
stop := make(chan struct{})
defer close(stop)

_, fakeWatch, controller := controllerSetup(t, startingObjects, stopChannel)

updated := make(chan bool)
deleted := make(chan bool)

controller.changeHandler = func(change *serviceChange) error {
defer func() {
if len(change.key) == 0 {
deleted <- true
} else if change.oldService != nil {
updated <- true
client := fake.NewSimpleClientset(startingObjects...)
controller := newController(t, client, stop)
controller.changeHandler = controller.processChange

go controller.Run(stop)

// Wait for the service spec and status to be updated with an allocated IP.
err := wait.Poll(25*time.Millisecond, 2*time.Second, func() (done bool, err error) {
for _, genericAction := range client.Actions() {
switch action := genericAction.(type) {
case clientgotesting.UpdateAction:
service, ok := action.GetObject().(*v1.Service)
if ok && len(service.Spec.ExternalIPs) > 0 && len(service.Status.LoadBalancer.Ingress) > 0 {
return true, nil
}
}
}()

err := controller.processChange(change)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

return err
return false, nil
})
if err != nil {
t.Fatalf("failed waiting for update: %v", err)
}

go controller.Run(stopChannel)
client.CoreV1().Services(namespace).Delete("lb-unallocated", &metav1.DeleteOptions{})

waitForUpdate := func(msg string) {
t.Logf("waiting for: %v", msg)
select {
case <-updated:
case <-time.After(time.Duration(30 * time.Second)):
t.Fatalf("failed to see: %v", msg)
// Wait for a delete to be persisted.
err = wait.Poll(25*time.Millisecond, 2*time.Second, func() (done bool, err error) {
for _, genericAction := range client.Actions() {
switch action := genericAction.(type) {
case clientgotesting.DeleteAction:
if action.GetName() == "lb-unallocated" {
return true, nil
}
}
}
}

waitForUpdate("spec update")
waitForUpdate("status update")

fakeWatch.Delete(startingObjects[0])

t.Log("waiting for the service to be deleted")
select {
case <-deleted:
case <-time.After(time.Duration(30 * time.Second)):
t.Fatalf("failed to see expected service deletion")
return false, nil
})
if err != nil {
t.Fatalf("failed waiting for delete: %v", err)
}
}