Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Decouple submit-pod phase + logs visibility (#97)
Browse files Browse the repository at this point in the history
* Spark-submit Logs

* lint

* Rework Spark log visibility based on State

* Rework Spark log visibility based on State

* Rework Spark log visibility based on State

* Add tests
  • Loading branch information
akhurana001 authored Jun 17, 2020
1 parent 3a54059 commit e13c7b8
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
5 changes: 3 additions & 2 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
return pi
}

func PhaseInfoInitializing(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseInitializing, version, nil, &TaskInfo{OccurredAt: &t})
func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {

pi := phaseInfo(PhaseInitializing, version, nil, info)
pi.reason = reason
return pi
}
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// ErrImagePull -> Transitionary phase to ImagePullBackOff
// ContainerCreating -> Image is being downloaded
// PodInitializing -> Init containers are running
return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage)), nil
return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil

case "CreateContainerError":
// This happens if for instance the command to the container is incorrect, ie doesn't run
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl

switch p, version := state.GetPhase(); p {
case PhaseStart:
phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason())
phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason(), &core.TaskInfo{OccurredAt: &t})

case PhasePreLaunch:
version := GetPhaseVersionOffset(p, 1) + version
Expand Down
26 changes: 20 additions & 6 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,13 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
customInfoMap := make(map[string]string)

logConfig := logs.GetLogConfig()
if logConfig.IsKubernetesEnabled && sj.Status.DriverInfo.PodName != "" {

state := sj.Status.AppState.State
isQueued := state == sparkOp.NewState ||
state == sparkOp.PendingSubmissionState ||
state == sparkOp.SubmittedState

if logConfig.IsKubernetesEnabled && !isQueued && sj.Status.DriverInfo.PodName != "" {
k8sLog, err := logUtils.NewKubernetesLogPlugin(logConfig.KubernetesURL).GetTaskLog(
sj.Status.DriverInfo.PodName,
sj.Namespace,
Expand All @@ -227,7 +233,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
taskLogs = append(taskLogs, &k8sLog)
}

if logConfig.IsCloudwatchEnabled {
if logConfig.IsCloudwatchEnabled && !isQueued {
cwUserLogs := core.TaskLog{
Uri: fmt.Sprintf(
"https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix",
Expand All @@ -246,6 +252,14 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
Name: "System Logs (via Cloudwatch)",
MessageFormat: core.TaskLog_JSON,
}

taskLogs = append(taskLogs, &cwUserLogs)
taskLogs = append(taskLogs, &cwSystemLogs)

}

if logConfig.IsCloudwatchEnabled {

allUserLogs := core.TaskLog{
Uri: fmt.Sprintf(
"https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix",
Expand All @@ -255,8 +269,6 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
Name: "Spark-Submit/All User Logs (via Cloudwatch)",
MessageFormat: core.TaskLog_JSON,
}
taskLogs = append(taskLogs, &cwUserLogs)
taskLogs = append(taskLogs, &cwSystemLogs)
taskLogs = append(taskLogs, &allUserLogs)
}

Expand Down Expand Up @@ -303,8 +315,10 @@ func (sparkResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.

occurredAt := time.Now()
switch app.Status.AppState.State {
case sparkOp.NewState, sparkOp.SubmittedState, sparkOp.PendingSubmissionState:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "job submitted"), nil
case sparkOp.NewState:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "job queued"), nil
case sparkOp.SubmittedState, sparkOp.PendingSubmissionState:
return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, "job submitted", info), nil
case sparkOp.FailedSubmissionState:
reason := fmt.Sprintf("Spark Job Submission Failed with Error: %s", app.Status.AppState.ErrorMessage)
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info), nil
Expand Down
9 changes: 8 additions & 1 deletion go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,26 @@ func TestGetEventInfo(t *testing.T) {
}))
info, err := getEventInfoForSpark(dummySparkApplication(sj.RunningState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 5)
assert.Equal(t, fmt.Sprintf("https://%s", sparkUIAddress), info.CustomInfo.Fields[sparkDriverUI].GetStringValue())
assert.Equal(t, "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", info.Logs[0].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", info.Logs[1].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=system_log.var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[2].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[3].Uri)
assert.Equal(t, "https://spark-ui.flyte", info.Logs[4].Uri)

info, err = getEventInfoForSpark(dummySparkApplication(sj.SubmittedState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 1)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[0].Uri)

assert.NoError(t, setSparkConfig(&Config{
SparkHistoryServerURL: "spark-history.flyte",
}))

info, err = getEventInfoForSpark(dummySparkApplication(sj.FailedState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 5)
assert.Equal(t, "spark-history.flyte/history/app-id", info.CustomInfo.Fields[sparkHistoryUI].GetStringValue())
assert.Equal(t, "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", info.Logs[0].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", info.Logs[1].Uri)
Expand All @@ -100,7 +107,7 @@ func TestGetTaskPhase(t *testing.T) {

taskPhase, err = sparkResourceHandler.GetTaskPhase(ctx, nil, dummySparkApplication(sj.SubmittedState))
assert.NoError(t, err)
assert.Equal(t, taskPhase.Phase(), pluginsCore.PhaseQueued)
assert.Equal(t, taskPhase.Phase(), pluginsCore.PhaseInitializing)
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

Expand Down

0 comments on commit e13c7b8

Please sign in to comment.