Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Decouple submit-pod phase + logs visibility #97

Merged
merged 6 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
return pi
}

func PhaseInfoInitializing(t time.Time, version uint32, reason string) PhaseInfo {
pi := phaseInfo(PhaseInitializing, version, nil, &TaskInfo{OccurredAt: &t})
func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {

pi := phaseInfo(PhaseInitializing, version, nil, info)
pi.reason = reason
return pi
}
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// ErrImagePull -> Transitionary phase to ImagePullBackOff
// ContainerCreating -> Image is being downloaded
// PodInitializing -> Init containers are running
return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage)), nil
return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil

case "CreateContainerError":
// This happens if for instance the command to the container is incorrect, ie doesn't run
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl

switch p, version := state.GetPhase(); p {
case PhaseStart:
phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason())
phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason(), &core.TaskInfo{OccurredAt: &t})

case PhasePreLaunch:
version := GetPhaseVersionOffset(p, 1) + version
Expand Down
26 changes: 20 additions & 6 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,13 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
customInfoMap := make(map[string]string)

logConfig := logs.GetLogConfig()
if logConfig.IsKubernetesEnabled && sj.Status.DriverInfo.PodName != "" {

state := sj.Status.AppState.State
isQueued := state == sparkOp.NewState ||
state == sparkOp.PendingSubmissionState ||
state == sparkOp.SubmittedState

if logConfig.IsKubernetesEnabled && !isQueued && sj.Status.DriverInfo.PodName != "" {
k8sLog, err := logUtils.NewKubernetesLogPlugin(logConfig.KubernetesURL).GetTaskLog(
sj.Status.DriverInfo.PodName,
sj.Namespace,
Expand All @@ -227,7 +233,7 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
taskLogs = append(taskLogs, &k8sLog)
}

if logConfig.IsCloudwatchEnabled {
if logConfig.IsCloudwatchEnabled && !isQueued {
cwUserLogs := core.TaskLog{
Uri: fmt.Sprintf(
"https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix",
Expand All @@ -246,6 +252,14 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
Name: "System Logs (via Cloudwatch)",
MessageFormat: core.TaskLog_JSON,
}

taskLogs = append(taskLogs, &cwUserLogs)
taskLogs = append(taskLogs, &cwSystemLogs)

}

if logConfig.IsCloudwatchEnabled {

allUserLogs := core.TaskLog{
Uri: fmt.Sprintf(
"https://console.aws.amazon.com/cloudwatch/home?region=%s#logStream:group=%s;prefix=var.log.containers.%s;streamFilter=typeLogStreamPrefix",
Expand All @@ -255,8 +269,6 @@ func getEventInfoForSpark(sj *sparkOp.SparkApplication) (*pluginsCore.TaskInfo,
Name: "Spark-Submit/All User Logs (via Cloudwatch)",
MessageFormat: core.TaskLog_JSON,
}
taskLogs = append(taskLogs, &cwUserLogs)
taskLogs = append(taskLogs, &cwSystemLogs)
taskLogs = append(taskLogs, &allUserLogs)
}

Expand Down Expand Up @@ -303,8 +315,10 @@ func (sparkResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.

occurredAt := time.Now()
switch app.Status.AppState.State {
case sparkOp.NewState, sparkOp.SubmittedState, sparkOp.PendingSubmissionState:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "job submitted"), nil
case sparkOp.NewState:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "job queued"), nil
case sparkOp.SubmittedState, sparkOp.PendingSubmissionState:
return pluginsCore.PhaseInfoInitializing(occurredAt, pluginsCore.DefaultPhaseVersion, "job submitted", info), nil
case sparkOp.FailedSubmissionState:
reason := fmt.Sprintf("Spark Job Submission Failed with Error: %s", app.Status.AppState.ErrorMessage)
return pluginsCore.PhaseInfoRetryableFailure(errors.DownstreamSystemError, reason, info), nil
Expand Down
9 changes: 8 additions & 1 deletion go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,26 @@ func TestGetEventInfo(t *testing.T) {
}))
info, err := getEventInfoForSpark(dummySparkApplication(sj.RunningState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 5)
assert.Equal(t, fmt.Sprintf("https://%s", sparkUIAddress), info.CustomInfo.Fields[sparkDriverUI].GetStringValue())
assert.Equal(t, "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", info.Logs[0].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", info.Logs[1].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=system_log.var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[2].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[3].Uri)
assert.Equal(t, "https://spark-ui.flyte", info.Logs[4].Uri)

info, err = getEventInfoForSpark(dummySparkApplication(sj.SubmittedState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 1)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-app-name;streamFilter=typeLogStreamPrefix", info.Logs[0].Uri)

assert.NoError(t, setSparkConfig(&Config{
SparkHistoryServerURL: "spark-history.flyte",
}))

info, err = getEventInfoForSpark(dummySparkApplication(sj.FailedState))
assert.NoError(t, err)
assert.Len(t, info.Logs, 5)
assert.Equal(t, "spark-history.flyte/history/app-id", info.CustomInfo.Fields[sparkHistoryUI].GetStringValue())
assert.Equal(t, "k8s.com/#!/log/spark-namespace/spark-pod/pod?namespace=spark-namespace", info.Logs[0].Uri)
assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logStream:group=/kubernetes/flyte;prefix=var.log.containers.spark-pod;streamFilter=typeLogStreamPrefix", info.Logs[1].Uri)
Expand All @@ -100,7 +107,7 @@ func TestGetTaskPhase(t *testing.T) {

taskPhase, err = sparkResourceHandler.GetTaskPhase(ctx, nil, dummySparkApplication(sj.SubmittedState))
assert.NoError(t, err)
assert.Equal(t, taskPhase.Phase(), pluginsCore.PhaseQueued)
assert.Equal(t, taskPhase.Phase(), pluginsCore.PhaseInitializing)
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

Expand Down