Skip to content

Commit

Permalink
[#595] Improve artifact/parameter access (#639)
Browse files Browse the repository at this point in the history
<!-- Thank you for submitting a PR to Hera! 🚀 -->

**Pull Request Checklist**
- [x] Fixes #595 
- [ ] Tests added
- [x] Documentation/examples added
- [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR
title
<!-- Also remember to sign off commits or the DCO check will fail on
your PR! -->

**Description of PR**
<!-- If not linked to an issue, please describe your changes here -->

@elliotgunton and I bounced some thoughts on improving access to
parameters/artifact via #595 and #613. This PR adds the implementation
@elliotgunton suggested to the template invokator class. Open to having
these on a different mixin!

<!-- Piece of cake! ✨🍰✨ -->

---------

Signed-off-by: Flaviu Vadan <[email protected]>
Signed-off-by: Elliot Gunton <[email protected]>
Co-authored-by: Sambhav Kothari <[email protected]>
Co-authored-by: Elliot Gunton <[email protected]>
  • Loading branch information
3 people authored May 26, 2023
1 parent 6bbc07a commit da9a0ef
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 138 deletions.
181 changes: 181 additions & 0 deletions src/hera/workflows/_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
VolumeMount,
)
from hera.workflows.parameter import MISSING, Parameter
from hera.workflows.protocol import Templatable
from hera.workflows.resources import Resources
from hera.workflows.user_container import UserContainer
from hera.workflows.volume import Volume, _BaseVolume
Expand Down Expand Up @@ -670,6 +671,45 @@ class TemplateInvocatorSubNodeMixin(BaseMixin):
when: Optional[str] = None
with_sequence: Optional[Sequence] = None

@property
def _subtype(self) -> str:
raise NotImplementedError

@property
def id(self) -> str:
"""ID of this node."""
return f"{{{{{self._subtype}.{self.name}.id}}}}"

@property
def ip(self) -> str:
"""IP of this node."""
return f"{{{{{self._subtype}.{self.name}.ip}}}}"

@property
def status(self) -> str:
"""Status of this node."""
return f"{{{{{self._subtype}.{self.name}.status}}}}"

@property
def exit_code(self) -> str:
"""ExitCode holds the exit code of a script template."""
return f"{{{{{self._subtype}.{self.name}.exitCode}}}}"

@property
def started_at(self) -> str:
"""Time at which this node started."""
return f"{{{{{self._subtype}.{self.name}.startedAt}}}}"

@property
def finished_at(self) -> str:
"""Time at which this node completed."""
return f"{{{{{self._subtype}.{self.name}.finishedAt}}}}"

@property
def result(self) -> str:
"""Result holds the result (stdout) of a script template."""
return f"{{{{{self._subtype}.{self.name}.outputs.result}}}}"

@root_validator(pre=False)
def _check_values(cls, values):
def one(xs: List):
Expand All @@ -680,6 +720,147 @@ def one(xs: List):
raise ValueError("exactly one of ['template', 'template_ref', 'inline'] must be present")
return values

def _get_parameters_as(self, name: str, subtype: str) -> Parameter:
"""Returns a `Parameter` that represents all the outputs of the specified subtype.
Parameters
----------
name: str
The name of the parameter to search for.
subtype: str
The inheritor subtype field, used to construct the output artifact `from_` reference.
Returns
-------
Parameter
The parameter, named based on the given `name`, along with a value that references all outputs.
"""
return Parameter(name=name, value=f"{{{{{subtype}.{self.name}.outputs.parameters}}}}")

def _get_parameter(self, name: str, subtype: str) -> Parameter:
"""Attempts to find the specified parameter in the outputs for the specified subtype.
Notes
-----
This is specifically designed to be invoked by inheritors.
Parameters
----------
name: str
The name of the parameter to search for.
subtype: str
The inheritor subtype field, used to construct the output artifact `from_` reference.
Returns
-------
Parameter
The parameter if found.
Raises
------
ValueError
When no outputs can be constructed/no outputs are set.
KeyError
When the artifact is not found.
NotImplementedError
When something else other than an `Parameter` is found for the specified name.
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = self.template

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
if template.outputs.parameters is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
parameters = template.outputs.parameters # type: ignore

obj = next((output for output in parameters if output.name == name), None)
if obj is not None:
return Parameter(
name=obj.name,
value=f"{{{{{subtype}.{self.name}.outputs.parameters.{name}}}}}",
)
raise KeyError(f"No output parameter named `{name}` found")

def _get_artifact(self, name: str, subtype: str) -> Artifact:
"""Attempts to find the specified artifact in the outputs for the specified subtype.
Notes
-----
This is specifically designed to be invoked by inheritors.
Parameters
----------
name: str
The name of the artifact to search for.
subtype: str
The inheritor subtype field, used to construct the output artifact `from_` reference.
Returns
-------
Artifact
The artifact if found.
Raises
------
ValueError
When no outputs can be constructed/no outputs are set.
KeyError
When the artifact is not found.
NotImplementedError
When something else other than an `Artifact` is found for the specified name.
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = cast(Template, self.template)

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output artifacts when the template has no outputs: {template}")
elif template.outputs.artifacts is None: # type: ignore
raise ValueError(f"Cannot get output artifacts when the template has no output artifacts: {template}")
artifacts = cast(List[ModelArtifact], template.outputs.artifacts)

obj = next((output for output in artifacts if output.name == name), None)
if obj is not None:
return Artifact(name=name, path=obj.path, from_=f"{{{{{subtype}.{self.name}.outputs.artifacts.{name}}}}}")
raise KeyError(f"No output artifact named `{name}` found")

def get_parameters_as(self, name: str) -> Parameter:
"""Returns a `Parameter` that represents all the outputs of this subnode.
Parameters
----------
name: str
The name of the parameter to search for.
Returns
-------
Parameter
The parameter, named based on the given `name`, along with a value that references all outputs.
"""
return self._get_parameters_as(name=name, subtype=self._subtype)

def get_artifact(self, name: str) -> Artifact:
"""Gets an artifact from the outputs of this subnode"""
return self._get_artifact(name=name, subtype=self._subtype)

def get_parameter(self, name: str) -> Parameter:
"""Gets a parameter from the outputs of this subnode"""
return self._get_parameter(name=name, subtype=self._subtype)


def _get_params_from_source(source: Callable) -> Optional[List[Parameter]]:
source_signature: Dict[str, Optional[object]] = {}
Expand Down
75 changes: 5 additions & 70 deletions src/hera/workflows/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Template as _ModelTemplate,
WorkflowStep as _ModelWorkflowStep,
)
from hera.workflows.parameter import Parameter
from hera.workflows.protocol import Steppable, Templatable


Expand All @@ -31,78 +30,14 @@ class Step(
ParameterMixin,
ItemMixin,
):
"""Step is used to run a given template. Must be instantiated under a Steps or Parallel context,
or outside of a Workflow.
"""
Step is used to run a given template. Must be instantiated under a Steps or Parallel context, or
outside a Workflow.
"""

@property
def id(self) -> str:
return f"{{{{steps.{self.name}.id}}}}"

@property
def ip(self) -> str:
return f"{{{{steps.{self.name}.ip}}}}"

@property
def status(self) -> str:
return f"{{{{steps.{self.name}.status}}}}"

@property
def exit_code(self) -> str:
return f"{{{{steps.{self.name}.exitCode}}}}"

@property
def started_at(self) -> str:
return f"{{{{steps.{self.name}.startedAt}}}}"

@property
def finished_at(self) -> str:
return f"{{{{steps.{self.name}.finishedAt}}}}"

@property
def result(self) -> str:
return f"{{{{steps.{self.name}.outputs.result}}}}"

def get_parameters_as(self, name):
"""Gets all the output parameters from this task"""
return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}")

def get_parameter(self, name: str) -> Parameter:
"""Returns a Parameter from the task's outputs based on the name.
Parameters
----------
name: str
The name of the parameter to extract as an output.
Returns
-------
Parameter
Parameter with the same name
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = self.template

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
if template.outputs.parameters is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
parameters = template.outputs.parameters # type: ignore

obj = next((output for output in parameters if output.name == name), None)
if obj is not None:
return Parameter(
name=obj.name,
value=f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}",
)
raise KeyError(f"No output parameter named `{name}` found")
def _subtype(self) -> str:
return "steps"

def _build_as_workflow_step(self) -> _ModelWorkflowStep:
_template = None
Expand Down
70 changes: 2 additions & 68 deletions src/hera/workflows/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
Template,
)
from hera.workflows.operator import Operator
from hera.workflows.parameter import Parameter
from hera.workflows.protocol import Templatable
from hera.workflows.workflow_status import WorkflowStatus

Expand Down Expand Up @@ -119,73 +118,8 @@ def _get_dependency_tasks(self) -> List[str]:
return task_names

@property
def id(self) -> str:
return f"{{{{tasks.{self.name}.id}}}}"

@property
def ip(self) -> str:
return f"{{{{tasks.{self.name}.ip}}}}"

@property
def status(self) -> str:
return f"{{{{tasks.{self.name}.status}}}}"

@property
def exit_code(self) -> str:
return f"{{{{tasks.{self.name}.exitCode}}}}"

@property
def started_at(self) -> str:
return f"{{{{tasks.{self.name}.startedAt}}}}"

@property
def finished_at(self) -> str:
return f"{{{{tasks.{self.name}.finishedAt}}}}"

@property
def result(self) -> str:
return f"{{{{tasks.{self.name}.outputs.result}}}}"

def get_parameters_as(self, name: str) -> Parameter:
"""Gets all the output parameters from this task"""
return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}")

def get_parameter(self, name: str) -> Parameter:
"""Returns a Parameter from the task's outputs based on the name.
Parameters
----------
name: str
The name of the parameter to extract as an output.
Returns
-------
Parameter
Parameter with the same name
"""
if isinstance(self.template, str):
raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}")

# here, we build the template early to verify that we can get the outputs
if isinstance(self.template, Templatable):
template = self.template._build_template()
else:
template = self.template

# at this point, we know that the template is a `Template` object
if template.outputs is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}")
if template.outputs.parameters is None: # type: ignore
raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}")
parameters = template.outputs.parameters # type: ignore

obj = next((output for output in parameters if output.name == name), None)
if obj is not None:
return Parameter(
name=obj.name,
value=f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}",
)
raise KeyError(f"No output parameter named `{name}` found")
def _subtype(self) -> str:
return "tasks"

def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task:
"""Set self as a dependency of `other`."""
Expand Down

0 comments on commit da9a0ef

Please sign in to comment.