From f6a8187019ad211e206431ebd84786df1acfe02a Mon Sep 17 00:00:00 2001 From: Kohei Watanabe Date: Wed, 30 Aug 2023 17:57:59 +0900 Subject: [PATCH] Improve getMasterURL() to add [] to IPv6 if needed Resolves #1344 Spark 3.4 supports IPv6: - https://github.com/apache/spark/pull/36868 So I want to make the operator support IPv6. I can confirm that this can submit the spark-job in IPv6-only environment. Although it is necessary to add the following environment variables to the operator ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: spark-on-k8s-spark-operator spec: template: spec: containers: - name: spark-operator env: - name: _JAVA_OPTIONS value: "-Djava.net.preferIPv6Addresses=true" - name: KUBERNETES_DISABLE_HOSTNAME_VERIFICATION value: "true" ``` --- pkg/controller/sparkapplication/submission.go | 4 ++ .../sparkapplication/submission_test.go | 51 +++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/pkg/controller/sparkapplication/submission.go b/pkg/controller/sparkapplication/submission.go index b8e4e1b8d..98bb882e0 100644 --- a/pkg/controller/sparkapplication/submission.go +++ b/pkg/controller/sparkapplication/submission.go @@ -207,6 +207,10 @@ func getMasterURL() (string, error) { if kubernetesServicePort == "" { return "", fmt.Errorf("environment variable %s is not found", kubernetesServicePortEnvVar) } + // check if the host is IPv6 address + if strings.Contains(kubernetesServiceHost, ":") && !strings.HasPrefix(kubernetesServiceHost, "[") { + return fmt.Sprintf("k8s://https://[%s]:%s", kubernetesServiceHost, kubernetesServicePort), nil + } return fmt.Sprintf("k8s://https://%s:%s", kubernetesServiceHost, kubernetesServicePort), nil } diff --git a/pkg/controller/sparkapplication/submission_test.go b/pkg/controller/sparkapplication/submission_test.go index 9f32bbb15..20e247a01 100644 --- a/pkg/controller/sparkapplication/submission_test.go +++ b/pkg/controller/sparkapplication/submission_test.go @@ -583,3 +583,54 @@ func TestProxyUserArg(t *testing.T) { assert.Equal(t, "--proxy-user", args[4]) assert.Equal(t, "foo", args[5]) } + +func Test_getMasterURL(t *testing.T) { + setEnv := func(host string, port string) { + if err := os.Setenv(kubernetesServiceHostEnvVar, host); err != nil { + t.Fatal(err) + } + if err := os.Setenv(kubernetesServicePortEnvVar, port); err != nil { + t.Fatal(err) + } + } + + tests := []struct { + name string + host string + port string + want string + wantErr assert.ErrorAssertionFunc + }{ + { + name: "should return a valid master url when IPv4 address is used", + host: "localhost", + port: "6443", + want: "k8s://https://localhost:6443", + wantErr: assert.NoError, + }, + { + name: "should return a valid master url when IPv6 address is used", + host: "::1", + port: "6443", + want: "k8s://https://[::1]:6443", + wantErr: assert.NoError, + }, + { + name: "should throw an error when the host is empty", + host: "", + port: "6443", + want: "", + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + setEnv(tt.host, tt.port) + got, err := getMasterURL() + if !tt.wantErr(t, err, fmt.Sprintf("getMasterURL()")) { + return + } + assert.Equalf(t, tt.want, got, "getMasterURL()") + }) + } +}