Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add runtime logic for custom artifact types (support for custom artifact types pt. 3) #8233

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/python/kfp/components/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from kfp.components import structures
from kfp.components.container_component_artifact_channel import \
ContainerComponentArtifactChannel
from kfp.components.types import custom_artifact_types
from kfp.components.types import type_annotations
from kfp.components.types import type_utils

Expand Down Expand Up @@ -322,7 +323,7 @@ def _get_command_and_args_for_lightweight_component(
'from kfp import dsl',
'from kfp.dsl import *',
'from typing import *',
]
] + custom_artifact_types.get_custom_artifact_type_import_statements(func)

func_source = _get_function_source_definition(func)
source = textwrap.dedent('''
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/kfp/components/component_factory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ def test_with_packages_to_install_with_pip_index_url(self):
concat_command = ' '.join(command)
for package in packages_to_install + pip_index_urls:
self.assertTrue(package in concat_command)


if __name__ == '__main__':
unittest.main()
60 changes: 47 additions & 13 deletions sdk/python/kfp/components/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import inspect
import json
import os
from typing import Any, Callable, Dict, List, Optional, Union

from kfp.components import task_final_status
Expand All @@ -37,30 +38,46 @@ def __init__(self, executor_input: Dict, function_to_execute: Callable):
{}).get('artifacts', {}).items():
artifacts_list = artifacts.get('artifacts')
if artifacts_list:
self._input_artifacts[name] = self._make_input_artifact(
artifacts_list[0])
self._input_artifacts[name] = self.make_artifact(
artifacts_list[0],
name,
self._func,
)

for name, artifacts in self._input.get('outputs',
{}).get('artifacts', {}).items():
artifacts_list = artifacts.get('artifacts')
if artifacts_list:
self._output_artifacts[name] = self._make_output_artifact(
artifacts_list[0])
output_artifact = self.make_artifact(
artifacts_list[0],
name,
self._func,
)
self._output_artifacts[name] = output_artifact
self.makedirs_recursively(output_artifact.path)

self._return_annotation = inspect.signature(
self._func).return_annotation
self._executor_output = {}

@classmethod
def _make_input_artifact(cls, runtime_artifact: Dict):
return artifact_types.create_runtime_artifact(runtime_artifact)
def make_artifact(
self,
runtime_artifact: Dict,
name: str,
func: Callable,
) -> Any:
artifact_cls = func.__annotations__.get(name)
if type_annotations.is_artifact(artifact_cls):
connor-mccarthy marked this conversation as resolved.
Show resolved Hide resolved
# handles artifacts
return create_artifact_instance(
runtime_artifact, artifact_cls=artifact_cls)
else:
# handles InputPath and OutputPath
return create_artifact_instance(
connor-mccarthy marked this conversation as resolved.
Show resolved Hide resolved
runtime_artifact, artifact_cls=artifact_types.Artifact)

@classmethod
def _make_output_artifact(cls, runtime_artifact: Dict):
import os
artifact = artifact_types.create_runtime_artifact(runtime_artifact)
os.makedirs(os.path.dirname(artifact.path), exist_ok=True)
return artifact
def makedirs_recursively(self, path: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)

def _get_input_artifact(self, name: str):
return self._input_artifacts.get(name)
Expand Down Expand Up @@ -297,3 +314,20 @@ def execute(self):

result = self._func(**func_kwargs)
self._write_executor_output(result)


def create_artifact_instance(
runtime_artifact: Dict,
artifact_cls=None,
connor-mccarthy marked this conversation as resolved.
Show resolved Hide resolved
) -> artifact_types.Artifact:
"""Creates an artifact class instances from a runtime artifact
dictionary."""
schema_title = runtime_artifact.get('type', {}).get('schemaTitle', '')

artifact_cls = artifact_types._SCHEMA_TITLE_TO_TYPE.get(
schema_title) or artifact_cls or artifact_types.Artifact
return artifact_cls(
uri=runtime_artifact.get('uri', ''),
name=runtime_artifact.get('name', ''),
metadata=runtime_artifact.get('metadata', {}),
)
Loading