diff --git a/tests/network_test.go b/tests/network_test.go new file mode 100644 index 00000000..31b3f969 --- /dev/null +++ b/tests/network_test.go @@ -0,0 +1,103 @@ +package tests + +import ( + "errors" + "fmt" + "github.com/mesosphere/kudo-spark-operator/tests/utils" + log "github.com/sirupsen/logrus" + v12 "k8s.io/api/core/v1" + "testing" +) + +/* + Test that `hostNetwork` in SparkApplication propagates to driver and executor pods +*/ +func TestHostNetworkPropagation(t *testing.T) { + spark := utils.SparkOperatorInstallation{} + err := spark.InstallSparkOperator() + defer spark.CleanUp() + + if err != nil { + t.Fatal(err) + } + + var testCases = []struct { + driverHN bool + executorHN bool + }{ + {false, false}, + {true, false}, + {false, true}, + {true, true}, + } + + for i, tc := range testCases { + log.Infof("Running test case:\n- driver host network:\t\t%v\n- executor host network:\t%v", tc.driverHN, tc.executorHN) + jobName := fmt.Sprintf("host-network-test-job-%d", i) + job := utils.SparkJob{ + Name: jobName, + Template: "spark-mock-task-runner-job-host-network.yaml", + Params: map[string]interface{}{ + "args": []string{"1", "600"}, + "driverHostNetwork": tc.driverHN, + "executorHostNetwork": tc.executorHN, + }, + } + + // Submit the job and wait for it to start + err = spark.SubmitAndWaitForExecutors(&job) + if err != nil { + t.Fatal(err) + } + + // Verify driver pod hostNetwork and dnsPolicy values + driver, err := spark.DriverPod(job) + if err != nil { + t.Fatal(err) + } + err = verifyPodHostNetwork(driver, tc.driverHN) + log.Infof("Verifying driver %s spec values", driver.Name) + if err != nil { + t.Fatal(err) + } + + // Verify executor pods hostNetwork and dnsPolicy values + executors, err := spark.ExecutorPods(job) + if err != nil { + t.Fatal(err) + } + for _, executor := range executors { + log.Infof("Verifying executor %s spec values", executor.Name) + err = verifyPodHostNetwork(&executor, tc.executorHN) + if err != nil { + t.Fatal(err) + } + } + + // Terminate the job while it's running + spark.DeleteJob(job) + } +} + +func verifyPodHostNetwork(pod *v12.Pod, expectedHostNetwork bool) error { + log.Infof("Pod spec.hostNetwork: %v", pod.Spec.HostNetwork) + log.Infof("Pod spec.dnspolicy: %v", pod.Spec.DNSPolicy) + + // Check spec values + if pod.Spec.HostNetwork != expectedHostNetwork { + return errors.New(fmt.Sprintf("Unexpected hostNetwork value for pod %v: %s. Should be %v", pod.Spec.HostNetwork, pod.Name, expectedHostNetwork)) + } else if expectedHostNetwork && pod.Spec.DNSPolicy != v12.DNSClusterFirstWithHostNet { + return errors.New(fmt.Sprintf("Expected pod pod DNS policy to be \"dnsClusterFirstWithHostNet\", but it's %s", pod.Spec.DNSPolicy)) + } + + // Check pod IP + log.Infof("Pod status.podIP: %v", pod.Status.PodIP) + log.Infof("Pod status.hostIP: %v", pod.Status.HostIP) + if expectedHostNetwork && pod.Status.PodIP != pod.Status.HostIP { + return errors.New(fmt.Sprintf("Pod %s IP doesn't match the host IP", pod.Name)) + } else if !expectedHostNetwork && pod.Status.PodIP == pod.Status.HostIP { + return errors.New(fmt.Sprintf("Pod %s IP matches the host IP", pod.Name)) + } + + return nil +} diff --git a/tests/smoke_test.go b/tests/smoke_test.go index d0dad0c6..3bdd9364 100644 --- a/tests/smoke_test.go +++ b/tests/smoke_test.go @@ -3,7 +3,6 @@ package tests import ( "errors" "fmt" - "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" "github.com/mesosphere/kudo-spark-operator/tests/utils" log "github.com/sirupsen/logrus" v12 "k8s.io/api/core/v1" @@ -27,33 +26,16 @@ func TestShuffleAppDriverOutput(t *testing.T) { jobName := "shuffle-app" job := utils.SparkJob{ - Name: jobName, - Template: "spark-shuffle-job.yaml", + Name: jobName, + Template: "spark-shuffle-job.yaml", + ExecutorsCount: expectedExecutorCount, Params: map[string]interface{}{ - "executor_count": expectedExecutorCount, - "args": []string{"4", strconv.Itoa(expectedGroupCount), "100", "4", "1500"}, + "args": []string{"4", strconv.Itoa(expectedGroupCount), "100", "4", "1500"}, }, } - err = spark.SubmitJob(&job) - if err != nil { - t.Fatal(err) - } - err = spark.WaitForJobState(job, v1beta2.RunningState) - if err != nil { - t.Fatal(err) - } - - // Wait for correct number of executors to show up - err = utils.Retry(func() error { - executors, err := spark.GetExecutorState(job) - if err != nil { - return err - } else if len(executors) != expectedExecutorCount { - return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount)) - } - return nil - }) + // Submit the job and wait for it to start + err = spark.SubmitAndWaitForExecutors(&job) if err != nil { t.Fatal(err) } @@ -81,32 +63,12 @@ func TestRunningAppDeletion(t *testing.T) { "args": []string{"1", "600"}, }, } - expectedExecutorCount := 1 // Submit the job and wait for it to start - err = spark.SubmitJob(&job) + err = spark.SubmitAndWaitForExecutors(&job) if err != nil { t.Fatal(err) } - err = spark.WaitForJobState(job, v1beta2.RunningState) - if err != nil { - t.Fatal(err) - } - - // Wait for correct number of executors to show up - - err = utils.Retry(func() error { - executors, err := spark.GetExecutorState(job) - if err != nil { - return err - } else if len(executors) != expectedExecutorCount { - return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount)) - } - return nil - }) - if err != nil { - t.Error(err) - } // Terminate the job while it's running spark.DeleteJob(job) diff --git a/tests/templates/spark-linear-regression-history-server-job.yaml b/tests/templates/spark-linear-regression-history-server-job.yaml index ff18f229..0209be7b 100644 --- a/tests/templates/spark-linear-regression-history-server-job.yaml +++ b/tests/templates/spark-linear-regression-history-server-job.yaml @@ -46,7 +46,7 @@ spec: serviceAccount: {{ .ServiceAccount }} executor: cores: 1 - instances: 1 + instances: {{ .ExecutorsCount }} memory: "512m" labels: version: {{ .SparkVersion }} diff --git a/tests/templates/spark-linear-regression-job.yaml b/tests/templates/spark-linear-regression-job.yaml index f990d610..b3fa3ae4 100644 --- a/tests/templates/spark-linear-regression-job.yaml +++ b/tests/templates/spark-linear-regression-job.yaml @@ -35,7 +35,7 @@ spec: serviceAccount: {{ .ServiceAccount }} executor: cores: 1 - instances: 1 + instances: {{ .ExecutorsCount }} memory: "512m" labels: version: {{ .SparkVersion }} diff --git a/tests/templates/spark-mock-task-runner-job-host-network.yaml b/tests/templates/spark-mock-task-runner-job-host-network.yaml new file mode 100644 index 00000000..600bb1c5 --- /dev/null +++ b/tests/templates/spark-mock-task-runner-job-host-network.yaml @@ -0,0 +1,36 @@ +apiVersion: "sparkoperator.k8s.io/v1beta2" +kind: SparkApplication +metadata: + name: {{ .Name }} + namespace: {{ .Namespace }} +spec: + type: Scala + mode: cluster + image: {{ .Image }} + imagePullPolicy: Always + mainClass: MockTaskRunner + mainApplicationFile: "https://infinity-artifacts.s3.amazonaws.com/scale-tests/dcos-spark-scala-tests-assembly-2.4.0-20190325.jar" + arguments: {{ range $i, $arg := index .Params "args" }} + - "{{ $arg }}"{{ end }} + sparkConf: + "spark.scheduler.maxRegisteredResourcesWaitingTime": "2400s" + "spark.scheduler.minRegisteredResourcesRatio": "1.0" + sparkVersion: {{ .SparkVersion }} + restartPolicy: + type: Never + driver: + cores: 1 + memory: "512m" + hostNetwork: {{ index .Params "driverHostNetwork" }} + labels: + version: {{ .SparkVersion }} + metrics-exposed: "true" + serviceAccount: {{ .ServiceAccount }} + executor: + cores: 1 + instances: {{ .ExecutorsCount }} + memory: "512m" + hostNetwork: {{ index .Params "executorHostNetwork" }} + labels: + version: {{ .SparkVersion }} + metrics-exposed: "true" diff --git a/tests/templates/spark-mock-task-runner-job.yaml b/tests/templates/spark-mock-task-runner-job.yaml index 717dba58..af5cbee5 100644 --- a/tests/templates/spark-mock-task-runner-job.yaml +++ b/tests/templates/spark-mock-task-runner-job.yaml @@ -38,7 +38,7 @@ spec: {{- end }} executor: cores: 1 - instances: 1 + instances: {{ .ExecutorsCount }} memory: "512m" labels: version: {{ .SparkVersion }} diff --git a/tests/templates/spark-pi.yaml b/tests/templates/spark-pi.yaml index af051654..2c4f01e1 100644 --- a/tests/templates/spark-pi.yaml +++ b/tests/templates/spark-pi.yaml @@ -24,7 +24,7 @@ spec: serviceAccount: {{ .ServiceAccount }} executor: cores: 1 - instances: 1 + instances: {{ .ExecutorsCount }} memory: "512m" labels: version: {{ .SparkVersion }} diff --git a/tests/templates/spark-shuffle-job.yaml b/tests/templates/spark-shuffle-job.yaml index 3c39cbd2..55bb1ee3 100644 --- a/tests/templates/spark-shuffle-job.yaml +++ b/tests/templates/spark-shuffle-job.yaml @@ -27,7 +27,7 @@ spec: serviceAccount: {{ .ServiceAccount }} executor: cores: 1 - instances: {{ index .Params "executor_count" }} + instances: {{ .ExecutorsCount }} memory: "512m" labels: version: {{ .SparkVersion }} diff --git a/tests/utils/job.go b/tests/utils/job.go index 56ec6504..af2e90d0 100644 --- a/tests/utils/job.go +++ b/tests/utils/job.go @@ -2,9 +2,12 @@ package utils import ( "errors" - "os" - + "fmt" + "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" log "github.com/sirupsen/logrus" + v12 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" ) type SparkJob struct { @@ -15,6 +18,8 @@ type SparkJob struct { Template string ServiceAccount string Params map[string]interface{} + Drivers int + ExecutorsCount int } func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error { @@ -32,6 +37,9 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error { if job.ServiceAccount == "" { job.ServiceAccount = spark.InstanceName + DefaultServiceAccountSuffix } + if job.ExecutorsCount == 0 { + job.ExecutorsCount = 1 + } yamlFile := createSparkJob(*job) defer os.Remove(yamlFile) @@ -41,6 +49,23 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error { return err } +func (spark *SparkOperatorInstallation) DriverPod(job SparkJob) (*v12.Pod, error) { + pod, err := spark.K8sClients.CoreV1().Pods(job.Namespace).Get(DriverPodName(job.Name), v1.GetOptions{}) + return pod, err +} + +func (spark *SparkOperatorInstallation) ExecutorPods(job SparkJob) ([]v12.Pod, error) { + pods, err := spark.K8sClients.CoreV1().Pods(job.Namespace).List(v1.ListOptions{ + LabelSelector: fmt.Sprintf("spark-role=executor,sparkoperator.k8s.io/app-name=%s", job.Name), + }) + + if err != nil { + return nil, err + } + + return pods.Items, nil +} + func (spark *SparkOperatorInstallation) DriverLog(job SparkJob) (string, error) { driverPodName := DriverPodName(job.Name) return getPodLog(spark.K8sClients, job.Namespace, driverPodName, 0) @@ -51,6 +76,31 @@ func (spark *SparkOperatorInstallation) DriverLogContains(job SparkJob, text str return podLogContains(spark.K8sClients, job.Namespace, driverPodName, text) } +func (spark *SparkOperatorInstallation) SubmitAndWaitForExecutors(job *SparkJob) error { + // Submit the job and wait for it to start + err := spark.SubmitJob(job) + if err != nil { + return err + } + + err = spark.WaitForJobState(*job, v1beta2.RunningState) + if err != nil { + return err + } + + // Wait for correct number of executors to show up + err = Retry(func() error { + executors, err := spark.GetExecutorState(*job) + if err != nil { + return err + } else if len(executors) != job.ExecutorsCount { + return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), job.ExecutorsCount)) + } + return nil + }) + return err +} + func (spark *SparkOperatorInstallation) WaitForOutput(job SparkJob, text string) error { log.Infof("Waiting for the following text to appear in the driver log: %s", text) err := Retry(func() error {