From cf286ab95ec60cfa355ba4143fb73561d645c203 Mon Sep 17 00:00:00 2001 From: akhurana001 <34587798+akhurana001@users.noreply.github.com> Date: Fri, 12 Mar 2021 16:19:25 -0800 Subject: [PATCH] Spark: Use GCP Dependency + Remove Mode (#160) * Move to GCP:multi-version + remove mode Signed-off-by: Anmol Khurana * Update log name Signed-off-by: Anmol Khurana * Some Logging changes Signed-off-by: Anmol Khurana * Some Logging changes Signed-off-by: Anmol Khurana --- flyteplugins/go.mod | 7 ++----- flyteplugins/go.sum | 2 ++ flyteplugins/go/tasks/plugins/k8s/spark/config.go | 2 +- flyteplugins/go/tasks/plugins/k8s/spark/spark.go | 9 ++++----- flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go | 7 ++++++- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 258354dbb7..4a826a9917 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -4,7 +4,7 @@ go 1.16 require ( cloud.google.com/go v0.78.0 // indirect - github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.1.3 + github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 github.com/Masterminds/semver v1.5.0 github.com/adammck/venv v0.0.0-20200610172036-e77789703e7c // indirect github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d @@ -49,7 +49,4 @@ require ( sigs.k8s.io/controller-runtime v0.8.2 ) -replace ( - github.com/GoogleCloudPlatform/spark-on-k8s-operator => github.com/lyft/spark-on-k8s-operator v0.1.4-0.20201027003055-c76b67e3b6d0 - github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d -) +replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index 3074c1fd31..9fd1c04b87 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -75,6 +75,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0= +github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625/go.mod h1:6PnrZv6zUDkrNMw0mIoGRmGBR7i9LulhKPmxFq4rUiM= github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/config.go b/flyteplugins/go/tasks/plugins/k8s/spark/config.go index c1c601fd74..7f78734ab2 100644 --- a/flyteplugins/go/tasks/plugins/k8s/spark/config.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/config.go @@ -10,7 +10,7 @@ import ( var ( defaultConfig = &Config{ LogConfig: LogConfig{ - User: logs.LogConfig{ + Mixed: logs.LogConfig{ IsKubernetesEnabled: true, KubernetesTemplateURI: "http://localhost:30084/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", }, diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark.go index ce0d333469..3e5608c4b4 100755 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark.go @@ -186,7 +186,6 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo Spec: sparkOp.SparkApplicationSpec{ ServiceAccount: &sparkTaskType, Type: getApplicationType(sparkJob.GetApplicationType()), - Mode: sparkOp.ClusterMode, Image: &container.Image, Arguments: modifiedArgs, Driver: driverSpec, @@ -286,7 +285,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, o, err := p.GetTaskLogs(tasklog.Input{ PodName: sj.Status.DriverInfo.PodName, Namespace: sj.Namespace, - LogName: "Driver Logs", + LogName: "(Driver Logs)", }) if err != nil { @@ -306,7 +305,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, o, err := p.GetTaskLogs(tasklog.Input{ PodName: sj.Status.DriverInfo.PodName, Namespace: sj.Namespace, - LogName: "User Logs", + LogName: "(User Logs)", }) if err != nil { @@ -325,7 +324,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, o, err := p.GetTaskLogs(tasklog.Input{ PodName: sj.Name, Namespace: sj.Namespace, - LogName: "System Logs", + LogName: "(System Logs)", }) if err != nil { @@ -345,7 +344,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, o, err := p.GetTaskLogs(tasklog.Input{ PodName: sj.Name, Namespace: sj.Namespace, - LogName: "Spark-Submit/All User Logs", + LogName: "(Spark-Submit/All User Logs)", }) if err != nil { diff --git a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go index ed3194a46e..bb7ad420a5 100755 --- a/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go @@ -81,11 +81,15 @@ func TestGetEventInfo(t *testing.T) { IsCloudwatchEnabled: true, CloudwatchTemplateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.{{ .podName }};streamFilter=typeLogStreamPrefix", }, + Mixed: logs.LogConfig{ + IsKubernetesEnabled: true, + KubernetesURL: "k8s.com", + }, }, })) info, err := getEventInfoForSpark(dummySparkApplication(sj.RunningState)) assert.NoError(t, err) - assert.Len(t, info.Logs, 5) + assert.Len(t, info.Logs, 6) assert.Equal(t, fmt.Sprintf("https://%s", sparkUIAddress), info.CustomInfo.Fields[sparkDriverUI].GetStringValue()) generatedLinks := make([]string, 0, len(info.Logs)) for _, l := range info.Logs { @@ -93,6 +97,7 @@ func TestGetEventInfo(t *testing.T) { } expectedLinks := []string{ + "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=system_log.var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix",