Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PodSpec should not require primary_container name #1380

Merged
merged 2 commits into from
Dec 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 11 additions & 31 deletions plugins/flytekit-k8s-pod/flytekitplugins/pod/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional, Tuple, Union

from flyteidl.core import tasks_pb2 as _core_task
Expand All @@ -18,6 +19,7 @@ def _sanitize_resource_name(resource: _task_models.Resources.ResourceEntry) -> s
return _core_task.Resources.ResourceName.Name(resource.name).lower().replace("_", "-")


@dataclass
class Pod(object):
"""
Pod is a platform-wide configuration that uses pod templates. By default, every task is launched as a container in a pod.
Expand All @@ -29,39 +31,17 @@ class Pod(object):
:param Optional[Dict[str, str]] annotations: Annotations are key/value pairs that are attached to arbitrary non-identifying metadata to pod spec.
"""

def __init__(
self,
pod_spec: V1PodSpec,
primary_container_name: str,
labels: Optional[Dict[str, str]] = None,
annotations: Optional[Dict[str, str]] = None,
):
if not pod_spec:
pod_spec: V1PodSpec
primary_container_name: str = _PRIMARY_CONTAINER_NAME_FIELD
labels: Optional[Dict[str, str]] = None
annotations: Optional[Dict[str, str]] = None

def __post_init_(self):
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
if not self.pod_spec:
raise _user_exceptions.FlyteValidationException("A pod spec cannot be undefined")
if not primary_container_name:
if not self.primary_container_name:
raise _user_exceptions.FlyteValidationException("A primary container name cannot be undefined")

self._pod_spec = pod_spec
self._primary_container_name = primary_container_name
self._labels = labels
self._annotations = annotations

@property
def pod_spec(self) -> V1PodSpec:
return self._pod_spec

@property
def primary_container_name(self) -> str:
return self._primary_container_name

@property
def labels(self) -> Optional[Dict[str, str]]:
return self._labels

@property
def annotations(self) -> Optional[Dict[str, str]]:
return self._annotations


class PodFunctionTask(PythonFunctionTask[Pod]):
def __init__(self, task_config: Pod, task_function: Callable, **kwargs):
Expand Down Expand Up @@ -114,7 +94,7 @@ def _serialize_pod_spec(self, settings: SerializationSettings) -> Dict[str, Any]

final_containers.append(container)

self.task_config._pod_spec.containers = final_containers
self.task_config.pod_spec.containers = final_containers

return ApiClient().sanitize_for_serialization(self.task_config.pod_spec)

Expand Down