Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

DCOS-60860 - Test host network support #61

Merged
merged 2 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions tests/network_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package tests

import (
"errors"
"fmt"
"github.com/mesosphere/kudo-spark-operator/tests/utils"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
"testing"
)

/*
Test that `hostNetwork` in SparkApplication propagates to driver and executor pods
*/
func TestHostNetworkPropagation(t *testing.T) {
spark := utils.SparkOperatorInstallation{}
err := spark.InstallSparkOperator()
defer spark.CleanUp()

if err != nil {
t.Fatal(err)
}

var testCases = []struct {
driverHN bool
executorHN bool
}{
{false, false},
{true, false},
{false, true},
{true, true},
}

for i, tc := range testCases {
log.Infof("Running test case:\n- driver host network:\t\t%v\n- executor host network:\t%v", tc.driverHN, tc.executorHN)
jobName := fmt.Sprintf("host-network-test-job-%d", i)
job := utils.SparkJob{
Name: jobName,
Template: "spark-mock-task-runner-job-host-network.yaml",
Params: map[string]interface{}{
"args": []string{"1", "600"},
"driverHostNetwork": tc.driverHN,
"executorHostNetwork": tc.executorHN,
},
}

// Submit the job and wait for it to start
err = spark.SubmitAndWaitForExecutors(&job)
if err != nil {
t.Fatal(err)
}

// Verify driver pod hostNetwork and dnsPolicy values
driver, err := spark.DriverPod(job)
if err != nil {
t.Fatal(err)
}
err = verifyPodHostNetwork(driver, tc.driverHN)
log.Infof("Verifying driver %s spec values", driver.Name)
if err != nil {
t.Fatal(err)
}

// Verify executor pods hostNetwork and dnsPolicy values
executors, err := spark.ExecutorPods(job)
if err != nil {
t.Fatal(err)
}
for _, executor := range executors {
log.Infof("Verifying executor %s spec values", executor.Name)
err = verifyPodHostNetwork(&executor, tc.executorHN)
if err != nil {
t.Fatal(err)
}
}

// Terminate the job while it's running
spark.DeleteJob(job)
}
}

func verifyPodHostNetwork(pod *v12.Pod, expectedHostNetwork bool) error {
log.Infof("Pod spec.hostNetwork: %v", pod.Spec.HostNetwork)
log.Infof("Pod spec.dnspolicy: %v", pod.Spec.DNSPolicy)

// Check spec values
if pod.Spec.HostNetwork != expectedHostNetwork {
return errors.New(fmt.Sprintf("Unexpected hostNetwork value for pod %v: %s. Should be %v", pod.Spec.HostNetwork, pod.Name, expectedHostNetwork))
} else if expectedHostNetwork && pod.Spec.DNSPolicy != v12.DNSClusterFirstWithHostNet {
return errors.New(fmt.Sprintf("Expected pod pod DNS policy to be \"dnsClusterFirstWithHostNet\", but it's %s", pod.Spec.DNSPolicy))
}

// Check pod IP
log.Infof("Pod status.podIP: %v", pod.Status.PodIP)
log.Infof("Pod status.hostIP: %v", pod.Status.HostIP)
if expectedHostNetwork && pod.Status.PodIP != pod.Status.HostIP {
return errors.New(fmt.Sprintf("Pod %s IP doesn't match the host IP", pod.Name))
} else if !expectedHostNetwork && pod.Status.PodIP == pod.Status.HostIP {
return errors.New(fmt.Sprintf("Pod %s IP matches the host IP", pod.Name))
}

return nil
}
52 changes: 7 additions & 45 deletions tests/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tests
import (
"errors"
"fmt"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"github.com/mesosphere/kudo-spark-operator/tests/utils"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
Expand All @@ -27,33 +26,16 @@ func TestShuffleAppDriverOutput(t *testing.T) {

jobName := "shuffle-app"
job := utils.SparkJob{
Name: jobName,
Template: "spark-shuffle-job.yaml",
Name: jobName,
Template: "spark-shuffle-job.yaml",
ExecutorsCount: expectedExecutorCount,
Params: map[string]interface{}{
"executor_count": expectedExecutorCount,
"args": []string{"4", strconv.Itoa(expectedGroupCount), "100", "4", "1500"},
"args": []string{"4", strconv.Itoa(expectedGroupCount), "100", "4", "1500"},
},
}

err = spark.SubmitJob(&job)
if err != nil {
t.Fatal(err)
}
err = spark.WaitForJobState(job, v1beta2.RunningState)
if err != nil {
t.Fatal(err)
}

// Wait for correct number of executors to show up
err = utils.Retry(func() error {
executors, err := spark.GetExecutorState(job)
if err != nil {
return err
} else if len(executors) != expectedExecutorCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount))
}
return nil
})
// Submit the job and wait for it to start
err = spark.SubmitAndWaitForExecutors(&job)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -81,32 +63,12 @@ func TestRunningAppDeletion(t *testing.T) {
"args": []string{"1", "600"},
},
}
expectedExecutorCount := 1

// Submit the job and wait for it to start
err = spark.SubmitJob(&job)
err = spark.SubmitAndWaitForExecutors(&job)
if err != nil {
t.Fatal(err)
}
err = spark.WaitForJobState(job, v1beta2.RunningState)
if err != nil {
t.Fatal(err)
}

// Wait for correct number of executors to show up

err = utils.Retry(func() error {
executors, err := spark.GetExecutorState(job)
if err != nil {
return err
} else if len(executors) != expectedExecutorCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount))
}
return nil
})
if err != nil {
t.Error(err)
}

// Terminate the job while it's running
spark.DeleteJob(job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
2 changes: 1 addition & 1 deletion tests/templates/spark-linear-regression-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
36 changes: 36 additions & 0 deletions tests/templates/spark-mock-task-runner-job-host-network.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: {{ .Name }}
namespace: {{ .Namespace }}
spec:
type: Scala
mode: cluster
image: {{ .Image }}
imagePullPolicy: Always
mainClass: MockTaskRunner
mainApplicationFile: "https://infinity-artifacts.s3.amazonaws.com/scale-tests/dcos-spark-scala-tests-assembly-2.4.0-20190325.jar"
arguments: {{ range $i, $arg := index .Params "args" }}
- "{{ $arg }}"{{ end }}
sparkConf:
"spark.scheduler.maxRegisteredResourcesWaitingTime": "2400s"
"spark.scheduler.minRegisteredResourcesRatio": "1.0"
sparkVersion: {{ .SparkVersion }}
restartPolicy:
type: Never
driver:
cores: 1
memory: "512m"
hostNetwork: {{ index .Params "driverHostNetwork" }}
labels:
version: {{ .SparkVersion }}
metrics-exposed: "true"
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
hostNetwork: {{ index .Params "executorHostNetwork" }}
labels:
version: {{ .SparkVersion }}
metrics-exposed: "true"
2 changes: 1 addition & 1 deletion tests/templates/spark-mock-task-runner-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ spec:
{{- end }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
Expand Down
2 changes: 1 addition & 1 deletion tests/templates/spark-pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
2 changes: 1 addition & 1 deletion tests/templates/spark-shuffle-job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: {{ index .Params "executor_count" }}
instances: {{ .ExecutorsCount }}
memory: "512m"
labels:
version: {{ .SparkVersion }}
Expand Down
54 changes: 52 additions & 2 deletions tests/utils/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package utils

import (
"errors"
"os"

"fmt"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
)

type SparkJob struct {
Expand All @@ -15,6 +18,8 @@ type SparkJob struct {
Template string
ServiceAccount string
Params map[string]interface{}
Drivers int
ExecutorsCount int
}

func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
Expand All @@ -32,6 +37,9 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
if job.ServiceAccount == "" {
job.ServiceAccount = spark.InstanceName + DefaultServiceAccountSuffix
}
if job.ExecutorsCount == 0 {
job.ExecutorsCount = 1
}

yamlFile := createSparkJob(*job)
defer os.Remove(yamlFile)
Expand All @@ -41,6 +49,23 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
return err
}

func (spark *SparkOperatorInstallation) DriverPod(job SparkJob) (*v12.Pod, error) {
pod, err := spark.K8sClients.CoreV1().Pods(job.Namespace).Get(DriverPodName(job.Name), v1.GetOptions{})
return pod, err
}

func (spark *SparkOperatorInstallation) ExecutorPods(job SparkJob) ([]v12.Pod, error) {
pods, err := spark.K8sClients.CoreV1().Pods(job.Namespace).List(v1.ListOptions{
LabelSelector: fmt.Sprintf("spark-role=executor,sparkoperator.k8s.io/app-name=%s", job.Name),
})

if err != nil {
return nil, err
}

return pods.Items, nil
}

func (spark *SparkOperatorInstallation) DriverLog(job SparkJob) (string, error) {
driverPodName := DriverPodName(job.Name)
return getPodLog(spark.K8sClients, job.Namespace, driverPodName, 0)
Expand All @@ -51,6 +76,31 @@ func (spark *SparkOperatorInstallation) DriverLogContains(job SparkJob, text str
return podLogContains(spark.K8sClients, job.Namespace, driverPodName, text)
}

func (spark *SparkOperatorInstallation) SubmitAndWaitForExecutors(job *SparkJob) error {
// Submit the job and wait for it to start
err := spark.SubmitJob(job)
if err != nil {
return err
}

err = spark.WaitForJobState(*job, v1beta2.RunningState)
if err != nil {
return err
}

// Wait for correct number of executors to show up
err = Retry(func() error {
executors, err := spark.GetExecutorState(*job)
if err != nil {
return err
} else if len(executors) != job.ExecutorsCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), job.ExecutorsCount))
}
return nil
})
return err
}

func (spark *SparkOperatorInstallation) WaitForOutput(job SparkJob, text string) error {
log.Infof("Waiting for the following text to appear in the driver log: %s", text)
err := Retry(func() error {
Expand Down