Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
DCOS-60860 - Test host network support
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Palaznik committed Nov 13, 2019
1 parent 4bcabe6 commit 6471a85
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 2 deletions.
2 changes: 1 addition & 1 deletion kudo-operator/operator/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ tasks:
- spark-rbac.yaml
- spark-serviceaccount.yaml
- webhook-init-job.yaml
- spark-operator-deployment.yaml
- webhook-service.yaml
- spark-operator-deployment.yaml
- spark-history-server-deployment.yaml
- spark-history-server-service.yaml

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ spec:
- -enable-webhook=true
- -webhook-svc-namespace={{ .Namespace }}
- -webhook-port={{ .Params.webhookPort }}
- -webhook-svc-name={{ .OperatorName }}-webhook
- -webhook-svc-name={{ .Name }}-webhook
- -webhook-config-name={{ .OperatorName }}-webhook-config
{{- end }}
105 changes: 105 additions & 0 deletions tests/network_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package tests

import (
"errors"
"fmt"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"github.com/mesosphere/kudo-spark-operator/tests/utils"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
"testing"
)

/*
Test that `hostNetwork` in SparkApplication propagates to driver and executor pods
*/
func TestHostNetworkPropagation(t *testing.T) {
spark := utils.SparkOperatorInstallation{}
err := spark.InstallSparkOperator()
defer spark.CleanUp()

if err != nil {
t.Fatal(err)
}

var testCases = []struct {
driverHN bool
executorHN bool
}{
{false, false},
{true, false},
{false, true},
{true, true},
}

for i, tc := range testCases {
log.Infof("Running test case:\n- driver host network:\t\t%v\n- executor host network:\t%v", tc.driverHN, tc.executorHN)
jobName := fmt.Sprintf("host-network-test-job-%d", i)
job := utils.SparkJob{
Name: jobName,
Template: "spark-mock-task-runner-job-host-network.yaml",
Params: map[string]interface{}{
"args": []string{"1", "600"},
"driverHostNetwork": tc.driverHN,
"executorHostNetwork": tc.executorHN,
},
}

expectedExecutorCount := 1

// Submit the job and wait for it to start
err = spark.SubmitJob(&job)
if err != nil {
t.Fatal(err)
}
err = spark.WaitForJobState(job, v1beta2.RunningState)
if err != nil {
t.Fatal(err)
}

// Wait for correct number of executors to show up
err = utils.Retry(func() error {
executors, err := spark.GetExecutorState(job)
if err != nil {
return err
} else if len(executors) != expectedExecutorCount {
return errors.New(fmt.Sprintf("The number of executors is %d, but %d is expected", len(executors), expectedExecutorCount))
}
return nil
})
if err != nil {
t.Error(err)
}

// Verify driver pod hostNetwork and dnsPolicy values
driver, err := spark.DriverPod(job)
if err != nil {
t.Fatal(err)
}
log.Infof("Driver spec.hostNetwork: %v", driver.Spec.HostNetwork)
log.Infof("Driver spec.dnspolicy: %v", driver.Spec.DNSPolicy)
if driver.Spec.HostNetwork != tc.driverHN {
t.Fatal(fmt.Sprintf("Unexpected hostNetwork value for driver %v: %s. Should be %v", driver.Spec.HostNetwork, driver.Name, tc.driverHN))
} else if tc.driverHN && driver.Spec.DNSPolicy != v12.DNSClusterFirstWithHostNet {
t.Fatal(fmt.Sprintf("Expected driver pod DNS policy to be \"dnsClusterFirstWithHostNet\", but it's %s", driver.Spec.DNSPolicy))
}

// Verify executor pods hostNetwork and dnsPolicy values
executors, err := spark.ExecutorPods(job)
if err != nil {
t.Fatal(err)
}
for _, executor := range executors {
log.Infof("Executor %s spec.hostNetwork: %v", executor.Name, executor.Spec.HostNetwork)
log.Infof("Executor %s spec.dnsPolicy: %v", executor.Name, executor.Spec.DNSPolicy)
if executor.Spec.HostNetwork != tc.executorHN {
t.Fatal(fmt.Sprintf("Unexpected hostNetwork value for driver %v: %s. Should be %v", executor.Spec.HostNetwork, executor.Name, tc.executorHN))
} else if tc.executorHN && executor.Spec.DNSPolicy != v12.DNSClusterFirstWithHostNet {
t.Fatal(fmt.Sprintf("Expected executor pod DNS policy to be \"dnsClusterFirstWithHostNet\", but it's %s", executor.Spec.DNSPolicy))
}
}

// Terminate the job while it's running
spark.DeleteJob(job)
}
}
36 changes: 36 additions & 0 deletions tests/templates/spark-mock-task-runner-job-host-network.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: {{ .Name }}
namespace: {{ .Namespace }}
spec:
type: Scala
mode: cluster
image: {{ .Image }}
imagePullPolicy: Always
mainClass: MockTaskRunner
mainApplicationFile: "https://infinity-artifacts.s3.amazonaws.com/scale-tests/dcos-spark-scala-tests-assembly-2.4.0-20190325.jar"
arguments: {{ range $i, $arg := index .Params "args" }}
- "{{ $arg }}"{{ end }}
sparkConf:
"spark.scheduler.maxRegisteredResourcesWaitingTime": "2400s"
"spark.scheduler.minRegisteredResourcesRatio": "1.0"
sparkVersion: {{ .SparkVersion }}
restartPolicy:
type: Never
driver:
cores: 1
memory: "512m"
hostNetwork: {{ index .Params "driverHostNetwork" }}
labels:
version: {{ .SparkVersion }}
metrics-exposed: "true"
serviceAccount: {{ .ServiceAccount }}
executor:
cores: 1
instances: 1
memory: "512m"
hostNetwork: {{ index .Params "executorHostNetwork" }}
labels:
version: {{ .SparkVersion }}
metrics-exposed: "true"
20 changes: 20 additions & 0 deletions tests/utils/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package utils

import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
)

Expand Down Expand Up @@ -40,6 +43,23 @@ func (spark *SparkOperatorInstallation) SubmitJob(job *SparkJob) error {
return err
}

func (spark *SparkOperatorInstallation) DriverPod(job SparkJob) (*v12.Pod, error) {
pod, err := spark.K8sClients.CoreV1().Pods(job.Namespace).Get(driverPodName(job.Name), v1.GetOptions{})
return pod, err
}

func (spark *SparkOperatorInstallation) ExecutorPods(job SparkJob) ([]v12.Pod, error) {
pods, err := spark.K8sClients.CoreV1().Pods(job.Namespace).List(v1.ListOptions{
LabelSelector: fmt.Sprintf("spark-role=executor,sparkoperator.k8s.io/app-name=%s", job.Name),
})

if err != nil {
return nil, err
}

return pods.Items, nil
}

func (spark *SparkOperatorInstallation) DriverLog(job SparkJob) (string, error) {
driverPodName := driverPodName(job.Name)
return getPodLog(spark.K8sClients, job.Namespace, driverPodName, 0)
Expand Down

0 comments on commit 6471a85

Please sign in to comment.