Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Support Optional Spark features #118

Merged
merged 3 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func TestLoadConfig(t *testing.T) {
t.Run("spark-config-test", func(t *testing.T) {
assert.NotNil(t, spark.GetSparkConfig())
assert.NotNil(t, spark.GetSparkConfig().DefaultSparkConfig)
assert.Equal(t, 2, len(spark.GetSparkConfig().Features))
assert.Equal(t, "feature1", spark.GetSparkConfig().Features[0].Name)
assert.Equal(t, "feature2", spark.GetSparkConfig().Features[1].Name)
assert.Equal(t, 2, len(spark.GetSparkConfig().Features[0].SparkConfig))
assert.Equal(t, 2, len(spark.GetSparkConfig().Features[1].SparkConfig))

})

t.Run("sagemaker-config-test", func(t *testing.T) {
Expand Down
44 changes: 40 additions & 4 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package spark
import (
"context"
"fmt"
"time"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/lyft/flyteplugins/go/tasks/errors"
"github.com/lyft/flyteplugins/go/tasks/logs"
pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/utils"

"k8s.io/client-go/kubernetes/scheme"

sparkOp "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta1"
Expand All @@ -22,7 +22,9 @@ import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/lyft/flyteplugins/go/tasks/errors"
"regexp"
"strings"
"time"

pluginsConfig "github.com/lyft/flyteplugins/go/tasks/config"
)
Expand All @@ -31,12 +33,20 @@ const KindSparkApplication = "SparkApplication"
const sparkDriverUI = "sparkDriverUI"
const sparkHistoryUI = "sparkHistoryUI"

var featureRegex = regexp.MustCompile(`^spark.((lyft)|(flyte)).(.+).enabled$`)

var sparkTaskType = "spark"

// Spark-specific configs
type Config struct {
DefaultSparkConfig map[string]string `json:"spark-config-default" pflag:",Key value pairs of default spark configuration that should be applied to every SparkJob"`
SparkHistoryServerURL string `json:"spark-history-server-url" pflag:",URL for SparkHistory Server that each job will publish the execution history to."`
Features []Feature `json:"features" pflag:",List of optional features supported."`
}

type Feature struct {
Name string `json:"name"`
SparkConfig map[string]string `json:"spark-config"`
}

var (
Expand Down Expand Up @@ -139,7 +149,12 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
}

for k, v := range sparkJob.GetSparkConf() {
sparkConfig[k] = v
// Add optional features if present.
if featureRegex.MatchString(k) {
addConfig(sparkConfig, k, v)
} else {
sparkConfig[k] = v
}
}

// Set pod limits.
Expand Down Expand Up @@ -184,6 +199,27 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
return j, nil
}

func addConfig(sparkConfig map[string]string, key string, value string) {

if strings.TrimSpace(value) != "true" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to lower?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return
}

matches := featureRegex.FindAllStringSubmatch(key, -1)
if len(matches) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(matches) == 0 {
if len(matches) == 0 || len(matches[0]) == 0 {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return
}
featureName := matches[0][len(matches[0])-1]

for _, feature := range GetSparkConfig().Features {
if feature.Name == featureName {
for k, v := range feature.SparkConfig {
sparkConfig[k] = v
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if multiple features override the same spark configs, the last one wins... is that ok/intentional/undesired (so we can add validation)? whichever it's, I would add docs on the config to explain the behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally they should be unique so this can be a nested map here instead of list of features. In future thou, I can see this being extended to check for values as well i.e. feature name + value can be unique so hence a struct makes better sense.

For now, I think using the first matching config makes sense (instead of the last). Added that with comments.

}
}
}
}

// Convert SparkJob ApplicationType to Operator CRD ApplicationType
func getApplicationType(applicationType plugins.SparkApplication_Type) sparkOp.SparkApplicationType {
switch applicationType {
Expand Down
48 changes: 39 additions & 9 deletions go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ const sparkUIAddress = "spark-ui.flyte"

var (
dummySparkConf = map[string]string{
"spark.driver.memory": "500M",
"spark.driver.cores": "1",
"spark.executor.cores": "1",
"spark.executor.instances": "3",
"spark.executor.memory": "500M",
"spark.driver.memory": "500M",
"spark.driver.cores": "1",
"spark.executor.cores": "1",
"spark.executor.instances": "3",
"spark.executor.memory": "500M",
"spark.flyte.feature1.enabled": "true",
"spark.lyft.feature2.enabled": "true",
}

dummyEnvVars = []*core.KeyValuePair{
Expand Down Expand Up @@ -271,6 +273,19 @@ func TestBuildResourceSpark(t *testing.T) {
// Case1: Valid Spark Task-Template
taskTemplate := dummySparkTaskTemplate("blah-1")

// Set spark custom feature config.
assert.NoError(t, setSparkConfig(&Config{
Features: []Feature{
{
Name: "feature1",
SparkConfig: map[string]string{"spark.hadoop.feature1": "true"},
},
{
Name: "feature2",
SparkConfig: map[string]string{"spark.hadoop.feature2": "true"},
},
},
}))
resource, err := sparkResourceHandler.BuildResource(context.TODO(), dummySparkTaskContext(taskTemplate))
assert.Nil(t, err)

Expand All @@ -285,10 +300,25 @@ func TestBuildResourceSpark(t *testing.T) {

for confKey, confVal := range dummySparkConf {
exists := false
for k, v := range sparkApp.Spec.SparkConf {
if k == confKey {
assert.Equal(t, v, confVal)
exists = true

if featureRegex.MatchString(confKey) {
match := featureRegex.FindAllStringSubmatch(confKey, -1)
feature := match[0][len(match[0])-1]
assert.True(t, feature == "feature1" || feature == "feature2")
for k, v := range sparkApp.Spec.SparkConf {
key := "spark.hadoop." + feature
if k == key {
assert.Equal(t, v, "true")
exists = true
}
}
} else {
for k, v := range sparkApp.Spec.SparkConf {

if k == confKey {
assert.Equal(t, v, confVal)
exists = true
}
}
}
assert.True(t, exists)
Expand Down
9 changes: 9 additions & 0 deletions go/tasks/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ plugins:
- spark.hadoop.fs.s3a.multipart.threshold: "536870912"
- spark.blacklist.enabled: "true"
- spark.blacklist.timeout: "5m"
features:
- name: "feature1"
spark-config:
- spark.hadoop.feature1 : "true"
- spark.sql.feature1 : "true"
- name: "feature2"
spark-config:
- spark.hadoop.feature2: "true"
- spark.sql.feature2: "true"
# Logging configuration
logs:
kubernetes-enabled: true
Expand Down