Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Add comments to IR YAML file #8467

Merged
merged 20 commits into from
Dec 5, 2022
13 changes: 10 additions & 3 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

from kfp.compiler import pipeline_spec_builder as builder
from kfp.components import base_component
from kfp.components import graph_component
from kfp.components import yaml_component
from kfp.components.types import type_utils


Expand Down Expand Up @@ -79,5 +77,14 @@ def compile(
pipeline_name=pipeline_name,
pipeline_parameters=pipeline_parameters,
)

if hasattr(pipeline_func, 'description'):
description = pipeline_func.description or None

else:
description = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible I'm missing something, but it looks like this is to handle the fact that GraphComponents have a .description, but other BaseComponents (YamlComponent, PythonComponent) don't. Do you think we could put .description on the BaseComponent abstract base class and implement for all concrete classes, that way all component/pipeline types can be treated the same way by the compiler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also allow us to avoid expanding the interface of write_pipeline_spec_to_file to support comments.


builder.write_pipeline_spec_to_file(
pipeline_spec=pipeline_spec, package_path=package_path)
pipeline_spec=pipeline_spec,
pipeline_description=description,
package_path=package_path)
156 changes: 156 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1490,5 +1490,161 @@ def pipeline_with_input(boolean: bool = False):
.default_value.bool_value, True)


class TestYamlComments(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests look good! A couple more test cases come to mind:

  • compiling a Python component @dsl.component
  • compiling a container component @dsl.container_component
  • compiling a pipeline or component with inputs that don't have default values
  • compiling a pipeline with an explicit name argument
  • compiling a pipeline or component with a float, int, dict, and list
  • compiling a pipeline or component with an artifact input and output

I don't think these each necessarily must be their own test (though that's fine too -- it's more specific but also more verbose).

Bundling these characteristics together and doing full comment assertions (comparing the actual full comment against an expected full comment) could make tackling these many cases a bit more manageable and similar to real-world components we'll be handling.


def test_comments_include_inputs_and_outputs_and_pipeline_name(self):

@dsl.component
def identity(string: str, model: bool) -> str:
return string

@dsl.pipeline()
def my_pipeline(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result

with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)

inputs_string = '# Inputs: \n# sample_input1: bool [Default: True]\n# sample_input2: str [Default: string]'
outputs_string = '# Outputs: \n# Output: str'
name_string = '# Name: my-pipeline'

# test name is in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved
self.assertTrue(name_string in yaml_content)

# test inputs are in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertTrue(inputs_string in yaml_content)

# test outputs are in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertTrue(outputs_string in yaml_content)

def test_comments_include_definition(self):

@dsl.component
def identity(string: str, model: bool) -> str:
return string

@dsl.pipeline()
def pipeline_with_no_definition(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result

with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_no_definition,
package_path=pipeline_spec_path)

definition_string = '# Description: This is a definition of this pipeline'
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved

# test definition not in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertFalse(definition_string in yaml_content)

@dsl.pipeline()
def pipeline_with_definition(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""This is a definition of this pipeline."""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result

with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_definition,
package_path=pipeline_spec_path)

definition_string = '# Description: This is a definition of this pipeline'

# test definition in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertTrue(definition_string in yaml_content)

def test_comments_on_pipeline_with_no_inputs_or_outputs(self):

@dsl.component
def identity(string: str, model: bool) -> str:
return string

@dsl.pipeline()
def pipeline_with_no_inputs() -> str:
op1 = identity(string='string', model=True)
result = op1.output
return result

with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_no_inputs,
package_path=pipeline_spec_path)

inputs_string = '# Inputs: \n'

# test inputs header not in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertFalse(inputs_string in yaml_content)

@dsl.pipeline()
def pipeline_with_no_outputs(sample_input1: bool = True,
sample_input2: str = 'string'):
identity(string=sample_input2, model=sample_input1)

with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_no_outputs,
package_path=pipeline_spec_path)

outputs_string = '# Outputs: \n'

# test outputs header not in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertFalse(outputs_string in yaml_content)

def test_comments_follow_pattern(self):

@dsl.component
def identity(string: str, model: bool) -> str:
return string

@dsl.pipeline()
def my_pipeline(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""This is a definition of this pipeline."""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result

with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)

pattern_sample = '# PIPELINE DEFINITION\n# Name: my-pipeline\n# Description: This is a definition of this pipeline.\n# Inputs: \n# sample_input1: bool [Default: True]\n# sample_input2: str [Default: string]\n# Outputs: \n# Output: str'
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved

# test name is in comments
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
self.assertTrue(pattern_sample in yaml_content)


if __name__ == '__main__':
unittest.main()
91 changes: 91 additions & 0 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,7 @@ def create_pipeline_spec(


def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
pipeline_description: str,
package_path: str) -> None:
"""Writes PipelineSpec into a YAML or JSON (deprecated) file.

Expand All @@ -1612,6 +1613,8 @@ def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
package_path (str): The path to which to write the PipelineSpec.
"""
json_dict = json_format.MessageToDict(pipeline_spec)
yaml_comments = extract_comments_from_pipeline_spec(json_dict,
pipeline_description)

if package_path.endswith('.json'):
warnings.warn(
Expand All @@ -1628,6 +1631,94 @@ def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
with open(package_path, 'w') as yaml_file:
yaml.dump(json_dict, yaml_file, sort_keys=True)

with open(package_path, 'r+') as f:
old = f.read()
f.seek(0)
f.write(yaml_comments + old)

JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved
else:
raise ValueError(
f'The output path {package_path} should end with ".yaml".')


def extract_comments_from_pipeline_spec(pipeline_spec: dict,
pipeline_description: str) -> str:
map_parameter_types = {
'NUMBER_INTEGER': 'int',
'NUMBER_DOUBLE': 'float',
'STRING': 'str',
'BOOLEAN': 'bool',
'LIST': 'list',
'STRUCT': 'dict'
}
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved

def add_inputs():
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved
if 'inputDefinitions' in pipeline_spec['root']:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What do you think about separating the information collection from the information presentation in add_inputs and add_outputs for readability [ref]?

inputs = pipeline_spec['root']['inputDefinitions']
string = '# Inputs: \n'

if 'parameters' in inputs:
for parameter in inputs['parameters']:
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved
string += '# ' + parameter + ': ' + map_parameter_types[
inputs['parameters'][parameter]['parameterType']]
if 'defaultValue' in inputs['parameters'][parameter]:
string += ' [Default: ' + str(
inputs['parameters'][parameter]
['defaultValue']) + ']'

string += '\n'

if 'artifacts' in inputs:
for artifact in inputs['artifacts']:
if 'schemaTitle' in inputs['artifacts'][artifact][
'artifactType']:
string += '# ' + artifact + ': ' + inputs[
'artifacts'][artifact]['artifactType'][
'schemaTitle'] + '\n'
#TODO: Add exception tp raise if schematitle doesnt exist
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved

return string
else:
return ''

def add_outputs():
if 'outputDefinitions' in pipeline_spec['root']:
outputs = pipeline_spec['root']['outputDefinitions']
string = '# Outputs: \n'

if 'parameters' in outputs:
for parameter in outputs['parameters']:
string += '# ' + parameter + ': ' + map_parameter_types[
outputs['parameters'][parameter]['parameterType']]
if 'defaultValue' in outputs['parameters'][parameter]:
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved
string += ' [Default: ' + str(
outputs['parameters'][parameter]
['defaultValue']) + ']'

string += '\n'

if 'artifacts' in outputs:
for artifact in outputs['artifacts']:
if 'schemaTitle' in outputs['artifacts'][artifact][
'artifactType']:
string += '# ' + artifact + ': ' + outputs[
'artifacts'][artifact]['artifactType'][
'schemaTitle'] + '\n'

#TODO: Add exception tp raise if schematitle doesnt exist

return string
else:
return ''

if 'root' not in pipeline_spec:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any instances where this condition would be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came across a test case when running the python execution tests which had no 'root'. [ref]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the link is broken. Can you provide another?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into this. It's attributed to these tests: sdk/python/kfp/compiler/pipeline_spec_builder_test.py::TestWriteIrToFile, which attempt to write an empty PipelineSpec.

Can you update these these? It's preferable to not program around our tests in the library code.

return ''

comment = '# PIPELINE DEFINITION\n'
comment += '# Name: ' + pipeline_spec['pipelineInfo']['name'] + '\n'
if pipeline_description:
comment += '# Description: ' + pipeline_description + '\n'
comment += add_inputs()
comment += add_outputs()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider obtaining all the different sections in a list, then using a string joining method. I think this would slightly improve readability and reduce the likelihood that a \n is omitted on one of the strings somewhere.

'\n'.join(sections)

The same pattern might be helpful in add_inputs and add_outputs


return comment
8 changes: 4 additions & 4 deletions sdk/python/kfp/compiler/pipeline_spec_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,15 @@ def test_yaml(self):
with tempfile.TemporaryDirectory() as tempdir:
temp_filepath = os.path.join(tempdir, 'output.yaml')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
self.pipeline_spec, None, temp_filepath)
actual = pipeline_spec_from_file(temp_filepath)
self.assertEqual(actual, self.pipeline_spec)

def test_yml(self):
with tempfile.TemporaryDirectory() as tempdir:
temp_filepath = os.path.join(tempdir, 'output.yml')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
self.pipeline_spec, None, temp_filepath)
actual = pipeline_spec_from_file(temp_filepath)
self.assertEqual(actual, self.pipeline_spec)

Expand All @@ -288,7 +288,7 @@ def test_json(self):
DeprecationWarning, r'Compiling to JSON is deprecated'):
temp_filepath = os.path.join(tempdir, 'output.json')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
self.pipeline_spec, None, temp_filepath)
actual = pipeline_spec_from_file(temp_filepath)
self.assertEqual(actual, self.pipeline_spec)

Expand All @@ -297,7 +297,7 @@ def test_incorrect_extension(self):
ValueError, r'should end with "\.yaml"\.'):
temp_filepath = os.path.join(tempdir, 'output.txt')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
self.pipeline_spec, None, temp_filepath)


if __name__ == '__main__':
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/kfp/components/graph_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def __init__(
self.pipeline_func = pipeline_func
self.name = name

self.description = component_spec.description
JOCSTAA marked this conversation as resolved.
Show resolved Hide resolved

args_list = []
signature = inspect.signature(pipeline_func)

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/kfp/components/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ def save_to_component_yaml(self, output_file: str) -> None:
from kfp.compiler import pipeline_spec_builder as builder

pipeline_spec = self.to_pipeline_spec()
builder.write_pipeline_spec_to_file(pipeline_spec, output_file)
builder.write_pipeline_spec_to_file(pipeline_spec, None, output_file)

def to_pipeline_spec(self) -> pipeline_spec_pb2.PipelineSpec:
"""Creates a pipeline instance and constructs the pipeline spec for a
Expand Down