From bc467a6ff12e6c682bc3506c6ad983be6604f319 Mon Sep 17 00:00:00 2001 From: Anmol Khurana Date: Tue, 16 Jun 2020 17:03:45 -0700 Subject: [PATCH] Rework Spark log visibility based on State --- go/tasks/plugins/k8s/spark/spark.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/go/tasks/plugins/k8s/spark/spark.go b/go/tasks/plugins/k8s/spark/spark.go index 96bc29ac8..d0e69606f 100755 --- a/go/tasks/plugins/k8s/spark/spark.go +++ b/go/tasks/plugins/k8s/spark/spark.go @@ -168,7 +168,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo HadoopConf: sparkJob.GetHadoopConf(), // SubmissionFailures handled here. Task Failures handled at Propeller/Job level. RestartPolicy: sparkOp.RestartPolicy{ - Type: sparkOp.OnFailure, + Type: sparkOp.OnFailure, OnSubmissionFailureRetries: &submissionFailureRetries, }, }, @@ -213,7 +213,13 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, customInfoMap := make(map[string]string) logConfig := logs.GetLogConfig() - if logConfig.IsKubernetesEnabled && sj.Status.DriverInfo.PodName != "" { + + state := sj.Status.AppState.State + isQueued := state == sparkOp.NewState || + state == sparkOp.PendingSubmissionState || + state == sparkOp.SubmittedState + + if logConfig.IsKubernetesEnabled && !isQueued || sj.Status.DriverInfo.PodName != "" { k8sLog, err := logUtils.NewKubernetesLogPlugin(logConfig.KubernetesURL).GetTaskLog( sj.Status.DriverInfo.PodName, sj.Namespace, @@ -227,7 +233,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, taskLogs = append(taskLogs, &k8sLog) } - if logConfig.IsCloudwatchEnabled { + if logConfig.IsCloudwatchEnabled && !isQueued { cwUserLogs := core.TaskLog{ Uri: fmt.Sprintf( "https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix", @@ -246,6 +252,12 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, Name: "System Logs (via Cloudwatch)", MessageFormat: core.TaskLog_JSON, } + + taskLogs = append(taskLogs, &cwUserLogs) + taskLogs = append(taskLogs, &cwSystemLogs) + + } else if logConfig.IsCloudwatchEnabled { + allUserLogs := core.TaskLog{ Uri: fmt.Sprintf( "https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix", @@ -255,8 +267,6 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo, Name: "Spark-Submit/All User Logs (via Cloudwatch)", MessageFormat: core.TaskLog_JSON, } - taskLogs = append(taskLogs, &cwUserLogs) - taskLogs = append(taskLogs, &cwSystemLogs) taskLogs = append(taskLogs, &allUserLogs) }