Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
updating semantics to address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Feb 3, 2023
1 parent 8f6f3a3 commit 02854c4
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strings"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

pluginserrors "github.com/flyteorg/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template"
Expand Down Expand Up @@ -107,6 +109,12 @@ func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata,
ApplyInterruptibleNodeAffinity(taskExecutionMetadata.IsInterruptible(), podSpec)
}

func mergeMapInto(src map[string]string, dst map[string]string) {
for key, value := range src {
dst[key] = value
}
}

// BuildRawPod constructs a PodSpec and ObjectMeta based on the definition passed by the TaskExecutionContext. This
// definition does not include any configuration injected by Flyte.
func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, string, error) {
Expand All @@ -123,7 +131,8 @@ func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v
}
primaryContainerName := ""

if taskTemplate.GetContainer() != nil {
switch target := taskTemplate.GetTarget().(type) {
case *core.TaskTemplate_Container:
// handles tasks defined by a single container
c, err := ToK8sContainer(ctx, tCtx)
if err != nil {
Expand All @@ -136,17 +145,17 @@ func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v
*c,
},
}
} else if taskTemplate.GetK8SPod() != nil {
case *core.TaskTemplate_K8SPod:
// handles pod tasks that marshal the pod spec to the k8s_pod task target.
if taskTemplate.GetK8SPod().PodSpec == nil {
if target.K8SPod.PodSpec == nil {
return nil, nil, "", pluginserrors.Errorf(pluginserrors.BadTaskSpecification,
"Pod tasks with task type version > 1 should specify their target as a K8sPod with a defined pod spec")
}

err := utils.UnmarshalStructToObj(taskTemplate.GetK8SPod().PodSpec, &podSpec)
err := utils.UnmarshalStructToObj(target.K8SPod.PodSpec, &podSpec)
if err != nil {
return nil, nil, "", pluginserrors.Errorf(pluginserrors.BadTaskSpecification,
"Unable to unmarshal task k8s pod [%v], Err: [%v]", taskTemplate.GetK8SPod().PodSpec, err.Error())
"Unable to unmarshal task k8s pod [%v], Err: [%v]", target.K8SPod.PodSpec, err.Error())
}

// get primary container name
Expand All @@ -158,10 +167,10 @@ func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v

// update annotations and labels
if taskTemplate.GetK8SPod().Metadata != nil {
objectMeta.Annotations = utils.UnionMaps(objectMeta.Annotations, taskTemplate.GetK8SPod().Metadata.Annotations)
objectMeta.Labels = utils.UnionMaps(objectMeta.Labels, taskTemplate.GetK8SPod().Metadata.Labels)
mergeMapInto(target.K8SPod.Metadata.Annotations, objectMeta.Annotations)
mergeMapInto(target.K8SPod.Metadata.Labels, objectMeta.Labels)
}
} else {
default:
return nil, nil, "", pluginserrors.Errorf(pluginserrors.BadTaskSpecification,
"invalid TaskSpecification, unable to determine Pod configuration")
}
Expand Down

0 comments on commit 02854c4

Please sign in to comment.