Skip to content

Commit

Permalink
Add demystify success for OOMKilled (flyteorg#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Jan 23, 2020
1 parent f021ac4 commit 93e0b8d
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 5 deletions.
18 changes: 18 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flytek8s
import (
"context"
"fmt"
"strings"
"time"

"github.com/lyft/flytestdlib/logger"
Expand All @@ -14,6 +15,7 @@ import (
)

const PodKind = "pod"
const OOMKilled = "OOMKilled"

func ToK8sPodSpec(ctx context.Context, taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskReader pluginsCore.TaskReader,
inputs io.InputReader, outputPaths io.OutputFilePaths) (*v1.PodSpec, error) {
Expand Down Expand Up @@ -167,6 +169,22 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil
}

func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
for _, status := range status.ContainerStatuses {
if status.State.Terminated != nil && strings.Contains(status.State.Terminated.Reason, OOMKilled) {
return pluginsCore.PhaseInfoRetryableFailure("OOMKilled",
"Pod reported success despite being OOMKilled", &info), nil
}
}
for _, status := range status.InitContainerStatuses {
if status.State.Terminated != nil && strings.Contains(status.State.Terminated.Reason, OOMKilled) {
return pluginsCore.PhaseInfoRetryableFailure("OOMKilled",
"Pod reported success despite being OOMKilled", &info), nil
}
}
return pluginsCore.PhaseInfoSuccess(&info), nil
}

func ConvertPodFailureToError(status v1.PodStatus) (code, message string) {
code = "UnknownError"
message = "Container/Pod failed. No message received from kubernetes."
Expand Down
44 changes: 43 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -338,6 +338,48 @@ func TestDemystifyPending(t *testing.T) {
})
}

func TestDemystifySuccess(t *testing.T) {
t.Run("OOMKilled", func(t *testing.T) {
phaseInfo, err := DemystifySuccess(v1.PodStatus{
ContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Reason: OOMKilled,
},
},
},
},
}, pluginsCore.TaskInfo{})
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "OOMKilled", phaseInfo.Err().Code)
})

t.Run("InitContainer OOMKilled", func(t *testing.T) {
phaseInfo, err := DemystifySuccess(v1.PodStatus{
InitContainerStatuses: []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
Reason: OOMKilled,
},
},
},
},
}, pluginsCore.TaskInfo{})
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
assert.Equal(t, "OOMKilled", phaseInfo.Err().Code)
})

t.Run("success", func(t *testing.T) {
phaseInfo, err := DemystifySuccess(v1.PodStatus{}, pluginsCore.TaskInfo{})
assert.Nil(t, err)
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
})
}

func TestConvertPodFailureToError(t *testing.T) {
t.Run("unknown-error", func(t *testing.T) {
code, _ := ConvertPodFailureToError(v1.PodStatus{})
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func CheckPodStatus(ctx context.Context, client core.KubeClient, name k8sTypes.N
}
switch pod.Status.Phase {
case v1.PodSucceeded:
return core.PhaseInfoSuccess(&taskInfo), nil
return flytek8s.DemystifySuccess(pod.Status, taskInfo)
case v1.PodFailed:
code, message := flytek8s.ConvertPodFailureToError(pod.Status)
return core.PhaseInfoRetryableFailure(code, message, &taskInfo), nil
Expand Down
4 changes: 2 additions & 2 deletions go/tasks/plugins/k8s/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"

"github.com/lyft/flyteplugins/go/tasks/logs"
pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -37,7 +37,7 @@ func (containerTaskExecutor) GetTaskPhase(ctx context.Context, pluginContext k8s
}
switch pod.Status.Phase {
case v1.PodSucceeded:
return pluginsCore.PhaseInfoSuccess(&info), nil
return flytek8s.DemystifySuccess(pod.Status, info)
case v1.PodFailed:
code, message := flytek8s.ConvertPodFailureToError(pod.Status)
return pluginsCore.PhaseInfoRetryableFailure(code, message, &info), nil
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (sidecarResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8
}
switch pod.Status.Phase {
case k8sv1.PodSucceeded:
return pluginsCore.PhaseInfoSuccess(&info), nil
return flytek8s.DemystifySuccess(pod.Status, info)
case k8sv1.PodFailed:
code, message := flytek8s.ConvertPodFailureToError(pod.Status)
return pluginsCore.PhaseInfoRetryableFailure(code, message, &info), nil
Expand Down

0 comments on commit 93e0b8d

Please sign in to comment.