Skip to content

Commit

Permalink
Set PrimaryContainerKey annotation by default (flyteorg#337)
Browse files Browse the repository at this point in the history
* added pod plugin optimizations

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 14, 2023
1 parent b101e65 commit c143eaa
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
16 changes: 15 additions & 1 deletion flyteplugins/go/tasks/plugins/array/k8s/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,18 @@ func advancePodPhases(ctx context.Context, store *storage.DataStore, outputWrite

for _, pod := range podList.Items {
newPhase := nextHappyPodPhase(pod.Status.Phase)
primaryContainerName := pod.Annotations["primary_container_name"]
if len(primaryContainerName) <= 0 {
primaryContainerName = "foo"
}
pod.Status.ContainerStatuses = []v1.ContainerStatus{
{ContainerID: "cont_123"},
v1.ContainerStatus{
Name: primaryContainerName,
ContainerID: primaryContainerName,
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
}

if pod.Status.Phase != newPhase && newPhase == v1.PodSucceeded {
Expand All @@ -95,6 +105,10 @@ func advancePodPhases(ctx context.Context, store *storage.DataStore, outputWrite
}
}

pod.Status.ContainerStatuses[0].State = v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{},
}

ref := outputWriter.GetOutputPath()
if idx > -1 {
ref, err = store.ConstructReference(ctx, outputWriter.GetOutputPrefixPath(), strconv.Itoa(idx), "outputs.pb")
Expand Down
7 changes: 7 additions & 0 deletions flyteplugins/go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ func TestCheckSubTasksState(t *testing.T) {
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: "foo"})

pod.Status.Phase = v1.PodRunning
pod.Status.ContainerStatuses = []v1.ContainerStatus{
v1.ContainerStatus{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
}
_ = fakeKubeClient.Create(ctx, pod)
_ = fakeKubeCache.Create(ctx, pod)
}
Expand Down
25 changes: 25 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
ctx := context.TODO()
t.Run("running", func(t *testing.T) {
j.Status.Phase = v1.PodRunning
j.Status.ContainerStatuses = []v1.ContainerStatus{
{
State: v1.ContainerState{
Running: &v1.ContainerStateRunning{},
},
},
}

phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
Expand Down Expand Up @@ -193,6 +201,23 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
assert.Equal(t, "Unschedulable", ec)
})

t.Run("successOptimized", func(t *testing.T) {
j.Status.Phase = v1.PodRunning
j.Status.ContainerStatuses = []v1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &v1.ContainerStateTerminated{
ExitCode: 0,
},
},
},
}

phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
})

t.Run("success", func(t *testing.T) {
j.Status.Phase = v1.PodSucceeded
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
Expand Down
21 changes: 18 additions & 3 deletions flyteplugins/go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,10 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu
return nil, err
}

// set primary container name if this is executed as a sidecar
if taskTemplate.Type == SidecarTaskType {
// set primaryContainerKey annotation if this is a Sidecar task or, as an optimization, if there is only a single
// container. this plugin marks the task complete if the primary Container is complete, so if there is only one
// container we can mark the task as complete before the Pod has been marked complete.
if taskTemplate.Type == SidecarTaskType || len(podSpec.Containers) == 1 {
objectMeta.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName
}

Expand Down Expand Up @@ -187,7 +189,20 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
default:
primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey]
if !exists {
// if the primary container annotation dos not exist, then the task requires all containers
// if all of the containers in the Pod are complete, as an optimization, we can declare the task as
// succeeded rather than waiting for the Pod to be marked completed.
allSuccessfullyTerminated := len(pod.Status.ContainerStatuses) > 0
for _, s := range pod.Status.ContainerStatuses {
if s.State.Waiting != nil || s.State.Running != nil || (s.State.Terminated != nil && s.State.Terminated.ExitCode != 0) {
allSuccessfullyTerminated = false
}
}

if allSuccessfullyTerminated {
return flytek8s.DemystifySuccess(pod.Status, info)
}

// if the primary container annotation does not exist, then the task requires all containers
// to succeed to declare success. therefore, if the pod is not in one of the above states we
// fallback to declaring the task as 'running'.
phaseInfo = pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info)
Expand Down

0 comments on commit c143eaa

Please sign in to comment.