Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chensun committed Apr 10, 2023
1 parent 95529f8 commit c6cb0af
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions sdk/python/kfp/components/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
self._task_spec.enable_caching = enable_caching
return self

def _ensure_container_spec_exists(self) -> None:
"""Ensures that the task has a container spec."""
if self.container_spec is None:
raise ValueError(
'This setting can only be set on single-step components, not pipelines used as components, or special components like importers.'
)

def _validate_cpu_request_limit(self, cpu: str) -> float:
"""Validates cpu request/limit string and converts to its numeric
value.
Expand Down Expand Up @@ -281,11 +288,9 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
cpu = self._validate_cpu_request_limit(cpu)
self._ensure_container_spec_exists()

if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
cpu = self._validate_cpu_request_limit(cpu)

if self.container_spec.resources is not None:
self.container_spec.resources.cpu_request = cpu
Expand All @@ -307,11 +312,9 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
cpu = self._validate_cpu_request_limit(cpu)
self._ensure_container_spec_exists()

if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
cpu = self._validate_cpu_request_limit(cpu)

if self.container_spec.resources is not None:
self.container_spec.resources.cpu_limit = cpu
Expand All @@ -331,15 +334,13 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if isinstance(limit, str):
if re.match(r'[1-9]\d*$', limit) is None:
raise ValueError(f'{"limit"!r} must be positive integer.')
limit = int(limit)

if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_count = limit
else:
Expand Down Expand Up @@ -453,11 +454,9 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
memory = self._validate_memory_request_limit(memory)
self._ensure_container_spec_exists()

if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
memory = self._validate_memory_request_limit(memory)

if self.container_spec.resources is not None:
self.container_spec.resources.memory_limit = memory
Expand Down Expand Up @@ -514,9 +513,7 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
if self.container_spec is None:
raise ValueError(
'There is no container specified in implementation')
self._ensure_container_spec_exists()

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_type = accelerator
Expand Down Expand Up @@ -550,6 +547,8 @@ def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
Returns:
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if self.container_spec.env is not None:
self.container_spec.env[name] = value
else:
Expand Down

0 comments on commit c6cb0af

Please sign in to comment.