-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sdk): Add comments to IR YAML file #8467
Changes from 4 commits
c0e10fa
a214b0c
ce06c9d
cf13883
c35c194
a5853e3
03849d6
aeb8f8b
16e1740
9493b0a
67d54e6
bfc7b46
6914577
30ca08e
331b261
f940d4b
d289edd
5b16589
8fbebfa
f4a7fa5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import re | ||
import subprocess | ||
import tempfile | ||
import textwrap | ||
from typing import Any, Dict, List, NamedTuple, Optional | ||
import unittest | ||
|
||
|
@@ -29,6 +30,11 @@ | |
from kfp.cli import cli | ||
from kfp.compiler import compiler | ||
from kfp.components.types import type_utils | ||
from kfp.dsl import ContainerSpec | ||
from kfp.dsl import Input | ||
from kfp.dsl import Model | ||
from kfp.dsl import Output | ||
from kfp.dsl import OutputPath | ||
from kfp.dsl import PipelineTaskFinalStatus | ||
from kfp.pipeline_spec import pipeline_spec_pb2 | ||
import yaml | ||
|
@@ -1490,5 +1496,274 @@ def pipeline_with_input(boolean: bool = False): | |
.default_value.bool_value, True) | ||
|
||
|
||
class TestYamlComments(unittest.TestCase): | ||
|
||
def test_comments_include_inputs_and_outputs_and_pipeline_name(self): | ||
|
||
@dsl.component | ||
def identity(string: str, model: bool) -> str: | ||
return string | ||
|
||
@dsl.pipeline() | ||
def my_pipeline(sample_input1: bool = True, | ||
sample_input2: str = 'string') -> str: | ||
op1 = identity(string=sample_input2, model=sample_input1) | ||
result = op1.output | ||
return result | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=my_pipeline, package_path=pipeline_spec_path) | ||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
inputs_string = '# Inputs:\n# sample_input1: bool [Default: True]\n# sample_input2: str [Default: string]' | ||
outputs_string = '# Outputs:\n# Output: str' | ||
name_string = '# Name: my-pipeline' | ||
|
||
# test name is in comments | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.assertTrue(name_string in yaml_content) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: What you have is perfectly valid, but perhaps use the approach of comparing the full comment string throughout (what you've done in the last test method). It's easier to reason about the correctness of the tests that compare the full comment string. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do feel like having it this way for these first few tests gives more specificity as to what exactly we are searching for in the comments, but if you do have a strong opinion against it, i could always implement it that way There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a strong opinion. We can leave it as is if you prefer. |
||
|
||
# test inputs are in comments | ||
self.assertTrue(inputs_string in yaml_content) | ||
|
||
# test outputs are in comments | ||
self.assertTrue(outputs_string in yaml_content) | ||
|
||
def test_comments_include_definition(self): | ||
|
||
@dsl.component | ||
def identity(string: str, model: bool) -> str: | ||
return string | ||
|
||
@dsl.pipeline() | ||
def pipeline_with_no_definition(sample_input1: bool = True, | ||
sample_input2: str = 'string') -> str: | ||
op1 = identity(string=sample_input2, model=sample_input1) | ||
result = op1.output | ||
return result | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=pipeline_with_no_definition, | ||
package_path=pipeline_spec_path) | ||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
definition_string = '# Description: This is a definition of this pipeline' | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# test definition not in comments | ||
self.assertFalse(definition_string in yaml_content) | ||
|
||
@dsl.pipeline() | ||
def pipeline_with_definition(sample_input1: bool = True, | ||
sample_input2: str = 'string') -> str: | ||
"""This is a definition of this pipeline.""" | ||
op1 = identity(string=sample_input2, model=sample_input1) | ||
result = op1.output | ||
return result | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=pipeline_with_definition, | ||
package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
definition_string = '# Description: This is a definition of this pipeline' | ||
|
||
# test definition in comments | ||
self.assertTrue(definition_string in yaml_content) | ||
|
||
def test_comments_on_pipeline_with_no_inputs_or_outputs(self): | ||
|
||
@dsl.component | ||
def identity(string: str, model: bool) -> str: | ||
return string | ||
|
||
@dsl.pipeline() | ||
def pipeline_with_no_inputs() -> str: | ||
op1 = identity(string='string', model=True) | ||
result = op1.output | ||
return result | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=pipeline_with_no_inputs, | ||
package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
inputs_string = '# Inputs:\n' | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# test inputs header not in comments | ||
self.assertFalse(inputs_string in yaml_content) | ||
|
||
@dsl.pipeline() | ||
def pipeline_with_no_outputs(sample_input1: bool = True, | ||
sample_input2: str = 'string'): | ||
identity(string=sample_input2, model=sample_input1) | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=pipeline_with_no_outputs, | ||
package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
outputs_string = '# Outputs:\n' | ||
|
||
# test outputs header not in comments | ||
self.assertFalse(outputs_string in yaml_content) | ||
|
||
def test_comments_follow_pattern(self): | ||
|
||
@dsl.component | ||
def identity(string: str, model: bool) -> str: | ||
return string | ||
|
||
@dsl.pipeline() | ||
def my_pipeline(sample_input1: bool = True, | ||
sample_input2: str = 'string') -> str: | ||
"""This is a definition of this pipeline.""" | ||
op1 = identity(string=sample_input2, model=sample_input1) | ||
result = op1.output | ||
return result | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=my_pipeline, package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
pattern_sample = textwrap.dedent("""\ | ||
# PIPELINE DEFINITION | ||
# Name: my-pipeline | ||
# Description: This is a definition of this pipeline. | ||
# Inputs: | ||
# sample_input1: bool [Default: True] | ||
# sample_input2: str [Default: string] | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Outputs: | ||
# Output: str | ||
""") | ||
|
||
# test comment follows pattern | ||
self.assertTrue(pattern_sample in yaml_content) | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def test_verbose_comment_characteristics(self): | ||
|
||
@dsl.component | ||
def output_model(metrics: Output[Model]): | ||
"""Dummy component that outputs metrics with a random accuracy.""" | ||
import random | ||
result = random.randint(0, 100) | ||
metrics.log_metric('accuracy', result) | ||
|
||
@dsl.pipeline(name='Test pipeline') | ||
def my_pipeline(sample_input1: bool, | ||
sample_input2: str, | ||
sample_input3: Input[Model], | ||
sample_input4: float = 3.14, | ||
sample_input5: list = [1, 2, 3], | ||
sample_input6: dict = { | ||
'one': 1, | ||
'two': 2, | ||
'three': 3 | ||
}, | ||
sample_input7: int = 5) -> Model: | ||
"""This is a definition of this pipeline.""" | ||
|
||
task = output_model() | ||
return task.output | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=my_pipeline, package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
predicted_comment = textwrap.dedent("""\ | ||
# PIPELINE DEFINITION | ||
# Name: test-pipeline | ||
# Description: This is a definition of this pipeline. | ||
# Inputs: | ||
# sample_input1: bool | ||
# sample_input2: str | ||
# sample_input4: float [Default: 3.14] | ||
# sample_input5: list [Default: [1.0, 2.0, 3.0]] | ||
# sample_input6: dict [Default: {'one': 1.0, 'two': 2.0, 'three': 3.0}] | ||
# sample_input7: int [Default: 5.0] | ||
# sample_input3: system.Model | ||
# Outputs: | ||
# Output: system.Model | ||
""") | ||
|
||
# test predicted comment matches actual comment | ||
self.assertTrue(predicted_comment in yaml_content) | ||
|
||
def test_comments_on_compiled_components(self): | ||
|
||
@dsl.component | ||
def my_component(string: str, model: bool) -> str: | ||
return string | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=my_component, package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
predicted_comment = textwrap.dedent("""\ | ||
# PIPELINE DEFINITION | ||
# Name: my-component | ||
# Inputs: | ||
# string: str | ||
# model: bool | ||
""") | ||
JOCSTAA marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# test comments work on compiled components | ||
self.assertTrue(predicted_comment in yaml_content) | ||
|
||
@dsl.container_component | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding a description test for the container component also. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a description to the current container component test case, that way it tests all properties. If you have a strong opinion in making it independent i could also do that too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM |
||
def my_container_component(text: str, output_path: OutputPath(str)): | ||
return ContainerSpec( | ||
image='python:3.7', | ||
command=['my_program', text], | ||
args=['--output_path', output_path]) | ||
|
||
with tempfile.TemporaryDirectory() as tmpdir: | ||
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') | ||
compiler.Compiler().compile( | ||
pipeline_func=my_container_component, | ||
package_path=pipeline_spec_path) | ||
|
||
with open(pipeline_spec_path, 'r+') as f: | ||
yaml_content = f.read() | ||
|
||
predicted_comment = textwrap.dedent("""\ | ||
# PIPELINE DEFINITION | ||
# Name: my-container-component | ||
# Inputs: | ||
# text: str | ||
""") | ||
|
||
# test comments work on compiled container components | ||
self.assertTrue(predicted_comment in yaml_content) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests look good! A couple more test cases come to mind:
@dsl.component
@dsl.container_component
name
argumentfloat
,int
,dict
, andlist
I don't think these each necessarily must be their own test (though that's fine too -- it's more specific but also more verbose).
Bundling these characteristics together and doing full comment assertions (comparing the actual full comment against an expected full comment) could make tackling these many cases a bit more manageable and similar to real-world components we'll be handling.