Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
DCOS-60860 - Test host network support (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
rpalaznik authored Nov 19, 2019
1 parent a08394b commit 6dee888
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 52 deletions.
103 changes: 103 additions & 0 deletions tests/network_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package tests

import (
"errors"
"fmt"
"github.com/mesosphere/kudo-spark-operator/tests/utils"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
"testing"
)

/*
Test that `hostNetwork` in SparkApplication propagates to driver and executor pods
*/
func TestHostNetworkPropagation(t *testing.T) {
spark := utils.SparkOperatorInstallation{}
err := spark.InstallSparkOperator()
defer spark.CleanUp()

if err != nil {
t.Fatal(err)
}

var testCases = []struct {
driverHN bool
executorHN bool
}{
{false, false},
{true, false},
{false, true},
{true, true},
}

for i, tc := range testCases {
log.Infof("Running test case:\n- driver host network:\t\t%v\n- executor host network:\t%v", tc.driverHN, tc.executorHN)
jobName := fmt.Sprintf("host-network-test-job-%d", i)
job := utils.SparkJob{
Name: jobName,
Template: "spark-mock-task-runner-job-host-network.yaml",
Params: map[string]interface{}{
"args": []string{"1", "600"},
"driverHostNetwork": tc.driverHN,
"executorHostNetwork": tc.executorHN,
},
}

// Submit the job and wait for it to start
err = spark.SubmitAndWaitForExecutors(&job)
if err != nil {
t.Fatal(err)
}

// Verify driver pod hostNetwork and dnsPolicy values
driver, err := spark.DriverPod(job)
if err != nil {
t.Fatal(err)
}
err = verifyPodHostNetwork(driver, tc.driverHN)
log.Infof("Verifying driver %s spec values", driver.Name)
if err != nil {
t.Fatal(err)
}

// Verify executor pods hostNetwork and dnsPolicy values
executors, err := spark.ExecutorPods(job)
if err != nil {
t.Fatal(err)
}
for _, executor := range executors {
log.Infof("Verifying executor %s spec values", executor.Name)
err = verifyPodHostNetwork(&executor, tc.executorHN)
if err != nil {
t.Fatal(err)
}
}

// Terminate the job while it's running
spark.DeleteJob(job)
}
}

func verifyPodHostNetwork(pod *v12.Pod, expectedHostNetwork bool) error {
log.Infof("Pod spec.hostNetwork: %v", pod.Spec.HostNetwork)
log.Infof("Pod spec.dnspolicy: %v", pod.Spec.DNSPolicy)

// Check spec values
if pod.Spec.HostNetwork != expectedHostNetwork {
return errors.New(fmt.Sprintf("Unexpected hostNetwork value for pod %v: %s. Should be %v", pod.Spec.HostNetwork, pod.Name, expectedHostNetwork))
} else if expectedHostNetwork && pod.Spec.DNSPolicy != v12.DNSClusterFirstWithHostNet {
return errors.New(fmt.Sprintf("Expected pod pod DNS policy to be \"dnsClusterFirstWithHostNet\", but it's %s", pod.Spec.DNSPolicy))
}

// Check pod IP
log.Infof("Pod status.podIP: %v", pod.Status.PodIP)
log.Infof("Pod status.hostIP: %v", pod.Status.HostIP)
if expectedHostNetwork && pod.Status.PodIP != pod.Status.HostIP {
return errors.New(fmt.Sprintf("Pod %s IP doesn't match the host IP", pod.Name))
} else if !expectedHostNetwork && pod.Status.PodIP == pod.Status.HostIP {
return errors.New(fmt.Sprintf("Pod %s IP matches the host IP", pod.Name))
}

return nil
}
52 changes: 7 additions & 45 deletions tests/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tests
import (
"errors"
"fmt"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"github.com/mesosphere/kudo-spark-operator/tests/utils"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
Expand All @@ -27,33 +26,16 @@ func TestShuffleAppDriverOutput(t *testing.T) {

jobName := "shuffle-app"
job := utils.SparkJob{
Name: jobName,
Template: "spark-shuffle-job.yaml",
Name: jobName,
Template: "spark-shuffle-job.yaml",
ExecutorsCount: expectedExecutorCount,
Params: map[string]interface{}{
"executor_count": expectedExecutorCount,
"args": []string{"4", strconv.Itoa(expectedGroupCount), "100", "4", "1500"},
"args": []string{"4", strconv.Itoa(expectedGroupCount), "100", "4", "1500"},
},
}

err = spark.SubmitJob(&job)
if err != nil {
t.Fatal(err)
}
err = spark.WaitForJobState(job, v1beta2.RunningState)
if err != nil {
t.Fatal(err)
}

// Wait for correct number of executors to show up
err = utils.Retry(func() error {
executors, err := spark.GetExecutorState(job)
if err != nil {
return err
} else if len(executors) != expectedExecutorCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount))
}
return nil
})
// Submit the job and wait for it to start
err = spark.SubmitAndWaitForExecutors(&job)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -81,32 +63,12 @@ func TestRunningAppDeletion(t *testing.T) {
"args": []string{"1", "600"},
},
}
expectedExecutorCount := 1

// Submit the job and wait for it to start
err = spark.SubmitJob(&job)
err = spark.SubmitAndWaitForExecutors(&job)
if err != nil {
t.Fatal(err)
}
err = spark.WaitForJobState(job, v1beta2.RunningState)
if err != nil {
t.Fatal(err)
}

// Wait for correct number of executors to show up

err = utils.Retry(func() error {
executors, err := spark.GetExecutorState(job)
if err != nil {
return err
} else if len(executors) != expectedExecutorCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount))
}
return nil
})
if err != nil {
t.Error(err)
}

// Terminate the job while it's running
spark.DeleteJob(job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
2 changes: 1 addition & 1 deletion tests/templates/spark-linear-regression-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
36 changes: 36 additions & 0 deletions tests/templates/spark-mock-task-runner-job-host-network.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: {{ .Name }}
namespace: {{ .Namespace }}
spec:
type: Scala
mode: cluster
image: {{ .Image }}
imagePullPolicy: Always
mainClass: MockTaskRunner
mainApplicationFile: "https://infinity-artifacts.s3.amazonaws.com/scale-tests/dcos-spark-scala-tests-assembly-2.4.0-20190325.jar"
arguments: {{ range $i, $arg := index .Params "args" }}
- "{{ $arg }}"{{ end }}
sparkConf:
"spark.scheduler.maxRegisteredResourcesWaitingTime": "2400s"
"spark.scheduler.minRegisteredResourcesRatio": "1.0"
sparkVersion: {{ .SparkVersion }}
restartPolicy:
type: Never
driver:
cores: 1
memory: "512m"
hostNetwork: {{ index .Params "driverHostNetwork" }}
labels:
version: {{ .SparkVersion }}
metrics-exposed: "true"
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
hostNetwork: {{ index .Params "executorHostNetwork" }}
labels:
version: {{ .SparkVersion }}
metrics-exposed: "true"
2 changes: 1 addition & 1 deletion tests/templates/spark-mock-task-runner-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ spec:
{{- end }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/spark-pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
2 changes: 1 addition & 1 deletion tests/templates/spark-shuffle-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: {{ index .Params "executor_count" }}
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
Expand Down
54 changes: 52 additions & 2 deletions tests/utils/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package utils

import (
"errors"
"os"

"fmt"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
)

type SparkJob struct {
Expand All @@ -15,6 +18,8 @@ type SparkJob struct {
Template string
ServiceAccount string
Params map[string]interface{}
Drivers int
ExecutorsCount int
}

func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
Expand All @@ -32,6 +37,9 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
if job.ServiceAccount == "" {
job.ServiceAccount = spark.InstanceName + DefaultServiceAccountSuffix
}
if job.ExecutorsCount == 0 {
job.ExecutorsCount = 1
}

yamlFile := createSparkJob(*job)
defer os.Remove(yamlFile)
Expand All @@ -41,6 +49,23 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
return err
}

func (spark *SparkOperatorInstallation) DriverPod(job SparkJob) (*v12.Pod, error) {
pod, err := spark.K8sClients.CoreV1().Pods(job.Namespace).Get(DriverPodName(job.Name), v1.GetOptions{})
return pod, err
}

func (spark *SparkOperatorInstallation) ExecutorPods(job SparkJob) ([]v12.Pod, error) {
pods, err := spark.K8sClients.CoreV1().Pods(job.Namespace).List(v1.ListOptions{
LabelSelector: fmt.Sprintf("spark-role=executor,sparkoperator.k8s.io/app-name=%s", job.Name),
})

if err != nil {
return nil, err
}

return pods.Items, nil
}

func (spark *SparkOperatorInstallation) DriverLog(job SparkJob) (string, error) {
driverPodName := DriverPodName(job.Name)
return getPodLog(spark.K8sClients, job.Namespace, driverPodName, 0)
Expand All @@ -51,6 +76,31 @@ func (spark *SparkOperatorInstallation) DriverLogContains(job SparkJob, text str
return podLogContains(spark.K8sClients, job.Namespace, driverPodName, text)
}

func (spark *SparkOperatorInstallation) SubmitAndWaitForExecutors(job *SparkJob) error {
// Submit the job and wait for it to start
err := spark.SubmitJob(job)
if err != nil {
return err
}

err = spark.WaitForJobState(*job, v1beta2.RunningState)
if err != nil {
return err
}

// Wait for correct number of executors to show up
err = Retry(func() error {
executors, err := spark.GetExecutorState(*job)
if err != nil {
return err
} else if len(executors) != job.ExecutorsCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), job.ExecutorsCount))
}
return nil
})
return err
}

func (spark *SparkOperatorInstallation) WaitForOutput(job SparkJob, text string) error {
log.Infof("Waiting for the following text to appear in the driver log: %s", text)
err := Retry(func() error {
Expand Down

0 comments on commit 6dee888

Please sign in to comment.