Skip to content

Commit

Permalink
Spark: Use GCP Dependency + Remove Mode (flyteorg#160)
Browse files Browse the repository at this point in the history
* Move to GCP:multi-version + remove mode

Signed-off-by: Anmol Khurana <[email protected]>

* Update log name

Signed-off-by: Anmol Khurana <[email protected]>

* Some Logging changes

Signed-off-by: Anmol Khurana <[email protected]>

* Some Logging changes

Signed-off-by: Anmol Khurana <[email protected]>
  • Loading branch information
akhurana001 authored Mar 13, 2021
1 parent 0c006cf commit d110778
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 12 deletions.
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.16

require (
cloud.google.com/go v0.78.0 // indirect
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.1.3
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625
github.com/Masterminds/semver v1.5.0
github.com/adammck/venv v0.0.0-20200610172036-e77789703e7c // indirect
github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
Expand Down Expand Up @@ -49,7 +49,4 @@ require (
sigs.k8s.io/controller-runtime v0.8.2
)

replace (
github.com/GoogleCloudPlatform/spark-on-k8s-operator => github.com/lyft/spark-on-k8s-operator v0.1.4-0.20201027003055-c76b67e3b6d0
github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
)
replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625/go.mod h1:6PnrZv6zUDkrNMw0mIoGRmGBR7i9LulhKPmxFq4rUiM=
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/spark/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
var (
defaultConfig = &Config{
LogConfig: LogConfig{
User: logs.LogConfig{
Mixed: logs.LogConfig{
IsKubernetesEnabled: true,
KubernetesTemplateURI: "http://localhost:30084/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}",
},
Expand Down
9 changes: 4 additions & 5 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
Spec: sparkOp.SparkApplicationSpec{
ServiceAccount: &sparkTaskType,
Type: getApplicationType(sparkJob.GetApplicationType()),
Mode: sparkOp.ClusterMode,
Image: &container.Image,
Arguments: modifiedArgs,
Driver: driverSpec,
Expand Down Expand Up @@ -286,7 +285,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Status.DriverInfo.PodName,
Namespace: sj.Namespace,
LogName: "Driver Logs",
LogName: "(Driver Logs)",
})

if err != nil {
Expand All @@ -306,7 +305,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Status.DriverInfo.PodName,
Namespace: sj.Namespace,
LogName: "User Logs",
LogName: "(User Logs)",
})

if err != nil {
Expand All @@ -325,7 +324,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Name,
Namespace: sj.Namespace,
LogName: "System Logs",
LogName: "(System Logs)",
})

if err != nil {
Expand All @@ -345,7 +344,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
o, err := p.GetTaskLogs(tasklog.Input{
PodName: sj.Name,
Namespace: sj.Namespace,
LogName: "Spark-Submit/All User Logs",
LogName: "(Spark-Submit/All User Logs)",
})

if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,23 @@ func TestGetEventInfo(t *testing.T) {
IsCloudwatchEnabled: true,
CloudwatchTemplateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.{{ .podName }};streamFilter=typeLogStreamPrefix",
},
Mixed: logs.LogConfig{
IsKubernetesEnabled: true,
KubernetesURL: "k8s.com",
},
},
}))
info, err := getEventInfoForSpark(dummySparkApplication(sj.RunningState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 5)
assert.Len(t, info.Logs, 6)
assert.Equal(t, fmt.Sprintf("https://%s", sparkUIAddress), info.CustomInfo.Fields[sparkDriverUI].GetStringValue())
generatedLinks := make([]string, 0, len(info.Logs))
for _, l := range info.Logs {
generatedLinks = append(generatedLinks, l.Uri)
}

expectedLinks := []string{
"k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace",
"k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace",
"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix",
"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=system_log.var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix",
Expand Down

0 comments on commit d110778

Please sign in to comment.