diff --git a/samples/test/config.yaml b/samples/test/config.yaml index 680ced3909f7..f57be3028e39 100644 --- a/samples/test/config.yaml +++ b/samples/test/config.yaml @@ -88,8 +88,9 @@ path: samples.v2.hello_world_test - name: producer_consumer_param path: samples.v2.producer_consumer_param_test -- name: pipeline_with_importer - path: samples.v2.pipeline_with_importer_test +# TODO: Re-enable after fixing protobuf.Value support for importer +# - name: pipeline_with_importer +# path: samples.v2.pipeline_with_importer_test # TODO(Bobgy): Re-enable after figuring out V2 Engine # and protobuf.Value support. # - name: cache_v2 diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json index 076225389626..8e5a91475196 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.json @@ -99,43 +99,6 @@ } } }, - "comp-importer-3": { - "executorLabel": "exec-importer-3", - "inputDefinitions": { - "parameters": { - "uri": { - "type": "STRING" - } - } - }, - "outputDefinitions": { - "artifacts": { - "artifact": { - "artifactType": { - "schemaTitle": "system.Dataset", - "schemaVersion": "0.0.1" - } - } - } - } - }, - "comp-pass-through-op": { - "executorLabel": "exec-pass-through-op", - "inputDefinitions": { - "parameters": { - "value": { - "parameterType": "STRING" - } - } - }, - "outputDefinitions": { - "parameters": { - "Output": { - "parameterType": "STRING" - } - } - } - }, "comp-train": { "executorLabel": "exec-train", "inputDefinitions": { @@ -191,34 +154,6 @@ } } } - }, - "comp-train-3": { - "executorLabel": "exec-train-3", - "inputDefinitions": { - "artifacts": { - "dataset": { - "artifactType": { - "schemaTitle": "system.Dataset", - "schemaVersion": "0.0.1" - } - } - } - }, - "outputDefinitions": { - "artifacts": { - "model": { - "artifactType": { - "schemaTitle": "system.Model", - "schemaVersion": "0.0.1" - } - } - }, - "parameters": { - "scalar": { - "parameterType": "STRING" - } - } - } } }, "deploymentSpec": { @@ -226,9 +161,7 @@ "exec-importer": { "importer": { "artifactUri": { - "constantValue": { - "stringValue": "gs://ml-pipeline-playground/shakespeare1.txt" - } + "constant": "gs://ml-pipeline-playground/shakespeare1.txt" }, "typeSchema": { "schemaTitle": "system.Dataset", @@ -241,40 +174,13 @@ "artifactUri": { "runtimeParameter": "uri" }, + "reimport": true, "typeSchema": { "schemaTitle": "system.Dataset", "schemaVersion": "0.0.1" } } }, - "exec-importer-3": { - "importer": { - "artifactUri": { - "runtimeParameter": "uri" - }, - "typeSchema": { - "schemaTitle": "system.Dataset", - "schemaVersion": "0.0.1" - } - } - }, - "exec-pass-through-op": { - "container": { - "args": [ - "--value", - "{{$.inputs.parameters['value']}}", - "----output-paths", - "{{$.outputs.parameters['Output'].output_file}}" - ], - "command": [ - "sh", - "-ec", - "program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n", - "def pass_through_op(value):\n return value\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value, str):\n raise TypeError('Value \"{}\" has type \"{}\" instead of str.'.format(\n str(str_value), str(type(str_value))))\n return str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Pass through op', description='')\n_parser.add_argument(\"--value\", dest=\"value\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = pass_through_op(**_parsed_args)\n\n_outputs = [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except OSError:\n pass\n with open(output_file, 'w') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n" - ], - "image": "python:3.7" - } - }, "exec-train": { "container": { "args": [ @@ -286,7 +192,7 @@ "command": [ "sh", "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.7' && \"$0\" \"$@\"\n", "sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", @@ -306,27 +212,7 @@ "command": [ "sh", "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n", - "sh", - "-ec", - "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", - "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef train(\n dataset: Input[Dataset]\n) -> NamedTuple('Outputs', [\n ('scalar', str),\n ('model', Model),\n]):\n \"\"\"Dummy Training step.\"\"\"\n with open(dataset.path, 'r') as f:\n data = f.read()\n print('Dataset:', data)\n\n scalar = '123'\n model = 'My model trained using data: {}'.format(data)\n\n from collections import namedtuple\n output = namedtuple('Outputs', ['scalar', 'model'])\n return output(scalar, model)\n\n" - ], - "image": "python:3.7" - } - }, - "exec-train-3": { - "container": { - "args": [ - "--executor_input", - "{{$}}", - "--function_to_execute", - "train" - ], - "command": [ - "sh", - "-c", - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.7' && \"$0\" \"$@\"\n", "sh", "-ec", "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", @@ -381,9 +267,7 @@ "parameters": { "uri": { "runtimeValue": { - "constantValue": { - "stringValue": "gs://ml-pipeline-playground/shakespeare1.txt" - } + "constant": "gs://ml-pipeline-playground/shakespeare1.txt" } } } @@ -392,48 +276,6 @@ "name": "importer" } }, - "importer-3": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-importer-3" - }, - "dependentTasks": [ - "pass-through-op" - ], - "inputs": { - "parameters": { - "uri": { - "taskOutputParameter": { - "outputParameterKey": "Output", - "producerTask": "pass-through-op" - } - } - } - }, - "taskInfo": { - "name": "importer-3" - } - }, - "pass-through-op": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-pass-through-op" - }, - "inputs": { - "parameters": { - "value": { - "componentInputParameter": "dataset2" - } - } - }, - "taskInfo": { - "name": "pass-through-op" - } - }, "train": { "cachingOptions": { "enableCache": true @@ -457,30 +299,6 @@ "taskInfo": { "name": "train" } - }, - "train-3": { - "cachingOptions": { - "enableCache": true - }, - "componentRef": { - "name": "comp-train-3" - }, - "dependentTasks": [ - "importer-3" - ], - "inputs": { - "artifacts": { - "dataset": { - "taskOutputArtifact": { - "outputArtifactKey": "artifact", - "producerTask": "importer-3" - } - } - } - }, - "taskInfo": { - "name": "train-3" - } } } }, @@ -493,7 +311,7 @@ } }, "schemaVersion": "2.1.0", - "sdkVersion": "kfp-1.8.6" + "sdkVersion": "kfp-1.8.7" }, "runtimeConfig": { "gcsOutputDirectory": "dummy_root", diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py index bd35aca6875a..44035baf94df 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_importer.py @@ -59,10 +59,6 @@ def my_pipeline(dataset2: str = 'gs://ml-pipeline-playground/shakespeare2.txt'): artifact_uri=dataset2, artifact_class=Dataset, reimport=True) train(dataset=importer2.output) - importer3 = importer( - artifact_uri=pass_through_op(dataset2).output, artifact_class=Dataset) - train(dataset=importer3.output) - if __name__ == '__main__': compiler.Compiler().compile(