Skip to content

Commit

Permalink
add e2e test that re-starts the scheduler pod
Browse files Browse the repository at this point in the history
  • Loading branch information
anuraagnalluri committed Feb 14, 2022
1 parent 87050bb commit 1d20074
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
12 changes: 11 additions & 1 deletion test/e2e/basic_scheduling/basic_scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ var _ = ginkgo.Describe("", func() {
Ω(err3).NotTo(HaveOccurred())
Ω(d).NotTo(BeNil())

ginkgo.By("Restart scheduler pod")
_, err4 := kClient.ScaleDeployment(configmanager.YKScheduler, 0, configmanager.YuniKornTestConfig.YkNamespace)
gomega.Ω(err4).NotTo(gomega.HaveOccurred())
err5 := kClient.WaitForPodBySelectorTerminated(configmanager.YuniKornTestConfig.YkNamespace, fmt.Sprintf("component=%s", configmanager.YKScheduler), 60)
gomega.Ω(err5).NotTo(gomega.HaveOccurred())
_, err6 := kClient.ScaleDeployment(configmanager.YKScheduler, 1, configmanager.YuniKornTestConfig.YkNamespace)
gomega.Ω(err6).NotTo(gomega.HaveOccurred())
err7 := kClient.WaitForPodBySelectorRunning(configmanager.YuniKornTestConfig.YkNamespace, fmt.Sprintf("component=%s", configmanager.YKScheduler), 10, true)
gomega.Ω(err7).NotTo(gomega.HaveOccurred())

ginkgo.By("create development namespace")
ns1, err := kClient.CreateNamespace(dev, nil)
gomega.Ω(err).NotTo(gomega.HaveOccurred())
Expand All @@ -81,7 +91,7 @@ var _ = ginkgo.Describe("", func() {
//Wait for pod to move to running state
err = kClient.WaitForPodBySelectorRunning(dev,
fmt.Sprintf("app=%s", sleepRespPod.ObjectMeta.Labels["app"]),
10)
10, false)
gomega.Ω(err).NotTo(gomega.HaveOccurred())

appsInfo, err = restClient.GetAppInfo(sleepRespPod.ObjectMeta.Labels["applicationId"])
Expand Down
40 changes: 37 additions & 3 deletions test/e2e/framework/helpers/k8s/k8s_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
Expand All @@ -32,6 +33,7 @@ import (

"k8s.io/apimachinery/pkg/util/wait"

scale "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/core/v1"
authv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -104,6 +106,17 @@ func (k *KubeCtl) GetKubeConfig() (*rest.Config, error) {
}
return nil, errors.New("kubeconfig is nil")
}

func (k *KubeCtl) ScaleDeployment(name string, count int32, namespace string) (*scale.Scale, error) {
deployments := k.clientSet.AppsV1().Deployments(namespace)
scale, err := deployments.GetScale(context.TODO(), name, metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
scale.Spec.Replicas = count
return deployments.UpdateScale(context.TODO(), name, scale, metav1.UpdateOptions{})
}

func (k *KubeCtl) GetPods(namespace string) (*v1.PodList, error) {
return k.clientSet.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
}
Expand Down Expand Up @@ -295,14 +308,19 @@ func (k *KubeCtl) ListPods(namespace string, selector string) (*v1.PodList, erro
}

// Wait up to timeout seconds for all pods in 'namespace' with given 'selector' to enter running state.
// Returns an error if no pods are found or not all discovered pods enter running state.
func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector string, timeout int) error {
// Returns an error if no pods are found when 'wait' is false or not all discovered pods enter running state within the 'timeout' duration.
// If 'wait' is true, error will not be returned if no pods are found. Pods will be continually listed until there is a non-empty list
// to iterate over.
func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector string, timeout int, wait bool) error {
podList, err := k.ListPods(namespace, selector)
if err != nil {
return err
}
if len(podList.Items) == 0 {
return fmt.Errorf("no pods in %s with selector %s", namespace, selector)
if !wait {
return fmt.Errorf("no pods in %s with selector %s", namespace, selector)
}
return k.WaitForPodBySelectorRunning(namespace, selector, timeout, wait)
}

for _, pod := range podList.Items {
Expand All @@ -313,6 +331,22 @@ func (k *KubeCtl) WaitForPodBySelectorRunning(namespace string, selector string,
return nil
}

// Wait up to timeout seconds for all pods in 'namespace' with given 'selector' to enter terminated state.
// Returns an error if any pods with the given 'selector' remain in 'namespace' after the 'timeout' duration.
func (k *KubeCtl) WaitForPodBySelectorTerminated(namespace string, selector string, timeout int) error {
podList, err := k.ListPods(namespace, selector)
if err != nil {
return err
}

for _, pod := range podList.Items {
if err := k.WaitForPodTerminated(namespace, pod.Name, time.Duration(timeout)*time.Second); err != nil {
return err
}
}
return nil
}

func (k *KubeCtl) CreateSecret(secret *v1.Secret, namespace string) (*v1.Secret, error) {
return k.clientSet.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
}
Expand Down

0 comments on commit 1d20074

Please sign in to comment.