Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Rework Spark log visibility based on State
Browse files Browse the repository at this point in the history
  • Loading branch information
Anmol Khurana committed Jun 17, 2020
1 parent 13cbf39 commit bc467a6
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
HadoopConf: sparkJob.GetHadoopConf(),
// SubmissionFailures handled here. Task Failures handled at Propeller/Job level.
RestartPolicy: sparkOp.RestartPolicy{
Type: sparkOp.OnFailure,
Type: sparkOp.OnFailure,
OnSubmissionFailureRetries: &submissionFailureRetries,
},
},
Expand Down Expand Up @@ -213,7 +213,13 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
customInfoMap := make(map[string]string)

logConfig := logs.GetLogConfig()
if logConfig.IsKubernetesEnabled && sj.Status.DriverInfo.PodName != "" {

state := sj.Status.AppState.State
isQueued := state == sparkOp.NewState ||
state == sparkOp.PendingSubmissionState ||
state == sparkOp.SubmittedState

if logConfig.IsKubernetesEnabled && !isQueued || sj.Status.DriverInfo.PodName != "" {
k8sLog, err := logUtils.NewKubernetesLogPlugin(logConfig.KubernetesURL).GetTaskLog(
sj.Status.DriverInfo.PodName,
sj.Namespace,
Expand All @@ -227,7 +233,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
taskLogs = append(taskLogs, &k8sLog)
}

if logConfig.IsCloudwatchEnabled {
if logConfig.IsCloudwatchEnabled && !isQueued {
cwUserLogs := core.TaskLog{
Uri: fmt.Sprintf(
"https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix",
Expand All @@ -246,6 +252,12 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
Name: "System Logs (via Cloudwatch)",
MessageFormat: core.TaskLog_JSON,
}

taskLogs = append(taskLogs, &cwUserLogs)
taskLogs = append(taskLogs, &cwSystemLogs)

} else if logConfig.IsCloudwatchEnabled {

allUserLogs := core.TaskLog{
Uri: fmt.Sprintf(
"https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix",
Expand All @@ -255,8 +267,6 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
Name: "Spark-Submit/All User Logs (via Cloudwatch)",
MessageFormat: core.TaskLog_JSON,
}
taskLogs = append(taskLogs, &cwUserLogs)
taskLogs = append(taskLogs, &cwSystemLogs)
taskLogs = append(taskLogs, &allUserLogs)
}

Expand Down

0 comments on commit bc467a6

Please sign in to comment.