From da9a0ef4c56116550d6259999578bb5ce9170d3d Mon Sep 17 00:00:00 2001 From: Flaviu Vadan Date: Fri, 26 May 2023 08:29:26 -0600 Subject: [PATCH] [#595] Improve artifact/parameter access (#639) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Pull Request Checklist** - [x] Fixes #595 - [ ] Tests added - [x] Documentation/examples added - [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR title **Description of PR** @elliotgunton and I bounced some thoughts on improving access to parameters/artifact via #595 and #613. This PR adds the implementation @elliotgunton suggested to the template invokator class. Open to having these on a different mixin! --------- Signed-off-by: Flaviu Vadan Signed-off-by: Elliot Gunton Co-authored-by: Sambhav Kothari Co-authored-by: Elliot Gunton --- src/hera/workflows/_mixins.py | 181 ++++++++++++++++++++++++++++++++++ src/hera/workflows/steps.py | 75 +------------- src/hera/workflows/task.py | 70 +------------ 3 files changed, 188 insertions(+), 138 deletions(-) diff --git a/src/hera/workflows/_mixins.py b/src/hera/workflows/_mixins.py index e3b0f7f6b..2f7d23dd5 100644 --- a/src/hera/workflows/_mixins.py +++ b/src/hera/workflows/_mixins.py @@ -54,6 +54,7 @@ VolumeMount, ) from hera.workflows.parameter import MISSING, Parameter +from hera.workflows.protocol import Templatable from hera.workflows.resources import Resources from hera.workflows.user_container import UserContainer from hera.workflows.volume import Volume, _BaseVolume @@ -670,6 +671,45 @@ class TemplateInvocatorSubNodeMixin(BaseMixin): when: Optional[str] = None with_sequence: Optional[Sequence] = None + @property + def _subtype(self) -> str: + raise NotImplementedError + + @property + def id(self) -> str: + """ID of this node.""" + return f"{{{{{self._subtype}.{self.name}.id}}}}" + + @property + def ip(self) -> str: + """IP of this node.""" + return f"{{{{{self._subtype}.{self.name}.ip}}}}" + + @property + def status(self) -> str: + """Status of this node.""" + return f"{{{{{self._subtype}.{self.name}.status}}}}" + + @property + def exit_code(self) -> str: + """ExitCode holds the exit code of a script template.""" + return f"{{{{{self._subtype}.{self.name}.exitCode}}}}" + + @property + def started_at(self) -> str: + """Time at which this node started.""" + return f"{{{{{self._subtype}.{self.name}.startedAt}}}}" + + @property + def finished_at(self) -> str: + """Time at which this node completed.""" + return f"{{{{{self._subtype}.{self.name}.finishedAt}}}}" + + @property + def result(self) -> str: + """Result holds the result (stdout) of a script template.""" + return f"{{{{{self._subtype}.{self.name}.outputs.result}}}}" + @root_validator(pre=False) def _check_values(cls, values): def one(xs: List): @@ -680,6 +720,147 @@ def one(xs: List): raise ValueError("exactly one of ['template', 'template_ref', 'inline'] must be present") return values + def _get_parameters_as(self, name: str, subtype: str) -> Parameter: + """Returns a `Parameter` that represents all the outputs of the specified subtype. + + Parameters + ---------- + name: str + The name of the parameter to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. + + Returns + ------- + Parameter + The parameter, named based on the given `name`, along with a value that references all outputs. + """ + return Parameter(name=name, value=f"{{{{{subtype}.{self.name}.outputs.parameters}}}}") + + def _get_parameter(self, name: str, subtype: str) -> Parameter: + """Attempts to find the specified parameter in the outputs for the specified subtype. + + Notes + ----- + This is specifically designed to be invoked by inheritors. + + Parameters + ---------- + name: str + The name of the parameter to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. + + Returns + ------- + Parameter + The parameter if found. + + Raises + ------ + ValueError + When no outputs can be constructed/no outputs are set. + KeyError + When the artifact is not found. + NotImplementedError + When something else other than an `Parameter` is found for the specified name. + """ + if isinstance(self.template, str): + raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + + # here, we build the template early to verify that we can get the outputs + if isinstance(self.template, Templatable): + template = self.template._build_template() + else: + template = self.template + + # at this point, we know that the template is a `Template` object + if template.outputs is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") + if template.outputs.parameters is None: # type: ignore + raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") + parameters = template.outputs.parameters # type: ignore + + obj = next((output for output in parameters if output.name == name), None) + if obj is not None: + return Parameter( + name=obj.name, + value=f"{{{{{subtype}.{self.name}.outputs.parameters.{name}}}}}", + ) + raise KeyError(f"No output parameter named `{name}` found") + + def _get_artifact(self, name: str, subtype: str) -> Artifact: + """Attempts to find the specified artifact in the outputs for the specified subtype. + + Notes + ----- + This is specifically designed to be invoked by inheritors. + + Parameters + ---------- + name: str + The name of the artifact to search for. + subtype: str + The inheritor subtype field, used to construct the output artifact `from_` reference. + + Returns + ------- + Artifact + The artifact if found. + + Raises + ------ + ValueError + When no outputs can be constructed/no outputs are set. + KeyError + When the artifact is not found. + NotImplementedError + When something else other than an `Artifact` is found for the specified name. + """ + if isinstance(self.template, str): + raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") + + # here, we build the template early to verify that we can get the outputs + if isinstance(self.template, Templatable): + template = self.template._build_template() + else: + template = cast(Template, self.template) + + # at this point, we know that the template is a `Template` object + if template.outputs is None: # type: ignore + raise ValueError(f"Cannot get output artifacts when the template has no outputs: {template}") + elif template.outputs.artifacts is None: # type: ignore + raise ValueError(f"Cannot get output artifacts when the template has no output artifacts: {template}") + artifacts = cast(List[ModelArtifact], template.outputs.artifacts) + + obj = next((output for output in artifacts if output.name == name), None) + if obj is not None: + return Artifact(name=name, path=obj.path, from_=f"{{{{{subtype}.{self.name}.outputs.artifacts.{name}}}}}") + raise KeyError(f"No output artifact named `{name}` found") + + def get_parameters_as(self, name: str) -> Parameter: + """Returns a `Parameter` that represents all the outputs of this subnode. + + Parameters + ---------- + name: str + The name of the parameter to search for. + + Returns + ------- + Parameter + The parameter, named based on the given `name`, along with a value that references all outputs. + """ + return self._get_parameters_as(name=name, subtype=self._subtype) + + def get_artifact(self, name: str) -> Artifact: + """Gets an artifact from the outputs of this subnode""" + return self._get_artifact(name=name, subtype=self._subtype) + + def get_parameter(self, name: str) -> Parameter: + """Gets a parameter from the outputs of this subnode""" + return self._get_parameter(name=name, subtype=self._subtype) + def _get_params_from_source(source: Callable) -> Optional[List[Parameter]]: source_signature: Dict[str, Optional[object]] = {} diff --git a/src/hera/workflows/steps.py b/src/hera/workflows/steps.py index 7d01f6e1a..1dee96f5e 100644 --- a/src/hera/workflows/steps.py +++ b/src/hera/workflows/steps.py @@ -20,7 +20,6 @@ Template as _ModelTemplate, WorkflowStep as _ModelWorkflowStep, ) -from hera.workflows.parameter import Parameter from hera.workflows.protocol import Steppable, Templatable @@ -31,78 +30,14 @@ class Step( ParameterMixin, ItemMixin, ): - """Step is used to run a given template. Must be instantiated under a Steps or Parallel context, - or outside of a Workflow. + """ + Step is used to run a given template. Must be instantiated under a Steps or Parallel context, or + outside a Workflow. """ @property - def id(self) -> str: - return f"{{{{steps.{self.name}.id}}}}" - - @property - def ip(self) -> str: - return f"{{{{steps.{self.name}.ip}}}}" - - @property - def status(self) -> str: - return f"{{{{steps.{self.name}.status}}}}" - - @property - def exit_code(self) -> str: - return f"{{{{steps.{self.name}.exitCode}}}}" - - @property - def started_at(self) -> str: - return f"{{{{steps.{self.name}.startedAt}}}}" - - @property - def finished_at(self) -> str: - return f"{{{{steps.{self.name}.finishedAt}}}}" - - @property - def result(self) -> str: - return f"{{{{steps.{self.name}.outputs.result}}}}" - - def get_parameters_as(self, name): - """Gets all the output parameters from this task""" - return Parameter(name=name, value=f"{{{{steps.{self.name}.outputs.parameters}}}}") - - def get_parameter(self, name: str) -> Parameter: - """Returns a Parameter from the task's outputs based on the name. - - Parameters - ---------- - name: str - The name of the parameter to extract as an output. - - Returns - ------- - Parameter - Parameter with the same name - """ - if isinstance(self.template, str): - raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") - - # here, we build the template early to verify that we can get the outputs - if isinstance(self.template, Templatable): - template = self.template._build_template() - else: - template = self.template - - # at this point, we know that the template is a `Template` object - if template.outputs is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") - if template.outputs.parameters is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") - parameters = template.outputs.parameters # type: ignore - - obj = next((output for output in parameters if output.name == name), None) - if obj is not None: - return Parameter( - name=obj.name, - value=f"{{{{steps.{self.name}.outputs.parameters.{name}}}}}", - ) - raise KeyError(f"No output parameter named `{name}` found") + def _subtype(self) -> str: + return "steps" def _build_as_workflow_step(self) -> _ModelWorkflowStep: _template = None diff --git a/src/hera/workflows/task.py b/src/hera/workflows/task.py index cd5b0e0fa..247ca1d01 100644 --- a/src/hera/workflows/task.py +++ b/src/hera/workflows/task.py @@ -21,7 +21,6 @@ Template, ) from hera.workflows.operator import Operator -from hera.workflows.parameter import Parameter from hera.workflows.protocol import Templatable from hera.workflows.workflow_status import WorkflowStatus @@ -119,73 +118,8 @@ def _get_dependency_tasks(self) -> List[str]: return task_names @property - def id(self) -> str: - return f"{{{{tasks.{self.name}.id}}}}" - - @property - def ip(self) -> str: - return f"{{{{tasks.{self.name}.ip}}}}" - - @property - def status(self) -> str: - return f"{{{{tasks.{self.name}.status}}}}" - - @property - def exit_code(self) -> str: - return f"{{{{tasks.{self.name}.exitCode}}}}" - - @property - def started_at(self) -> str: - return f"{{{{tasks.{self.name}.startedAt}}}}" - - @property - def finished_at(self) -> str: - return f"{{{{tasks.{self.name}.finishedAt}}}}" - - @property - def result(self) -> str: - return f"{{{{tasks.{self.name}.outputs.result}}}}" - - def get_parameters_as(self, name: str) -> Parameter: - """Gets all the output parameters from this task""" - return Parameter(name=name, value=f"{{{{tasks.{self.name}.outputs.parameters}}}}") - - def get_parameter(self, name: str) -> Parameter: - """Returns a Parameter from the task's outputs based on the name. - - Parameters - ---------- - name: str - The name of the parameter to extract as an output. - - Returns - ------- - Parameter - Parameter with the same name - """ - if isinstance(self.template, str): - raise ValueError(f"Cannot get output parameters when the template was set via a name: {self.template}") - - # here, we build the template early to verify that we can get the outputs - if isinstance(self.template, Templatable): - template = self.template._build_template() - else: - template = self.template - - # at this point, we know that the template is a `Template` object - if template.outputs is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no outputs: {template}") - if template.outputs.parameters is None: # type: ignore - raise ValueError(f"Cannot get output parameters when the template has no output parameters: {template}") - parameters = template.outputs.parameters # type: ignore - - obj = next((output for output in parameters if output.name == name), None) - if obj is not None: - return Parameter( - name=obj.name, - value=f"{{{{tasks.{self.name}.outputs.parameters.{name}}}}}", - ) - raise KeyError(f"No output parameter named `{name}` found") + def _subtype(self) -> str: + return "tasks" def next(self, other: Task, operator: Operator = Operator.and_, on: Optional[TaskResult] = None) -> Task: """Set self as a dependency of `other`."""