Skip to content

Commit

Permalink
fix(sdk.v2): Fix importer ignoring reimport setting, and switch to …
Browse files Browse the repository at this point in the history
…Protobuf.Value for import uri. (kubeflow#6827)

* fix importer

* release note

* remove error usage in test sample

* disable importer test
  • Loading branch information
chensun authored and abaland committed May 29, 2022
1 parent f499c25 commit 51cc2a5
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 194 deletions.
5 changes: 3 additions & 2 deletions samples/test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@
path: samples.v2.hello_world_test
- name: producer_consumer_param
path: samples.v2.producer_consumer_param_test
- name: pipeline_with_importer
path: samples.v2.pipeline_with_importer_test
# TODO: Re-enable after fixing protobuf.Value support for importer
# - name: pipeline_with_importer
# path: samples.v2.pipeline_with_importer_test
# TODO(Bobgy): Re-enable after figuring out V2 Engine
# and protobuf.Value support.
# - name: cache_v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,43 +99,6 @@
}
}
},
"comp-importer-3": {
"executorLabel": "exec-importer-3",
"inputDefinitions": {
"parameters": {
"uri": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"artifact": {
"artifactType": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
}
}
},
"comp-pass-through-op": {
"executorLabel": "exec-pass-through-op",
"inputDefinitions": {
"parameters": {
"value": {
"parameterType": "STRING"
}
}
},
"outputDefinitions": {
"parameters": {
"Output": {
"parameterType": "STRING"
}
}
}
},
"comp-train": {
"executorLabel": "exec-train",
"inputDefinitions": {
Expand Down Expand Up @@ -191,44 +154,14 @@
}
}
}
},
"comp-train-3": {
"executorLabel": "exec-train-3",
"inputDefinitions": {
"artifacts": {
"dataset": {
"artifactType": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
}
},
"outputDefinitions": {
"artifacts": {
"model": {
"artifactType": {
"schemaTitle": "system.Model",
"schemaVersion": "0.0.1"
}
}
},
"parameters": {
"scalar": {
"parameterType": "STRING"
}
}
}
}
},
"deploymentSpec": {
"executors": {
"exec-importer": {
"importer": {
"artifactUri": {
"constantValue": {
"stringValue": "gs://ml-pipeline-playground/shakespeare1.txt"
}
"constant": "gs://ml-pipeline-playground/shakespeare1.txt"
},
"typeSchema": {
"schemaTitle": "system.Dataset",
Expand All @@ -241,40 +174,13 @@
"artifactUri": {
"runtimeParameter": "uri"
},
"reimport": true,
"typeSchema": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
},
"exec-importer-3": {
"importer": {
"artifactUri": {
"runtimeParameter": "uri"
},
"typeSchema": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
},
"exec-pass-through-op": {
"container": {
"args": [
"--value",
"{{$.inputs.parameters['value']}}",
"----output-paths",
"{{$.outputs.parameters['Output'].output_file}}"
],
"command": [
"sh",
"-ec",
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def pass_through_op(value):\n return value\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value, str):\n raise TypeError('Value \"{}\" has type \"{}\" instead of str.'.format(\n str(str_value), str(type(str_value))))\n return str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Pass through op', description='')\n_parser.add_argument(\"--value\", dest=\"value\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = pass_through_op(**_parsed_args)\n\n_outputs = [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except OSError:\n pass\n with open(output_file, 'w') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"
],
"image": "python:3.7"
}
},
"exec-train": {
"container": {
"args": [
Expand All @@ -286,7 +192,7 @@
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.7' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
Expand All @@ -306,27 +212,7 @@
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef train(\n dataset: Input[Dataset]\n) -> NamedTuple('Outputs', [\n ('scalar', str),\n ('model', Model),\n]):\n \"\"\"Dummy Training step.\"\"\"\n with open(dataset.path, 'r') as f:\n data = f.read()\n print('Dataset:', data)\n\n scalar = '123'\n model = 'My model trained using data: {}'.format(data)\n\n from collections import namedtuple\n output = namedtuple('Outputs', ['scalar', 'model'])\n return output(scalar, model)\n\n"
],
"image": "python:3.7"
}
},
"exec-train-3": {
"container": {
"args": [
"--executor_input",
"{{$}}",
"--function_to_execute",
"train"
],
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.7' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
Expand Down Expand Up @@ -381,9 +267,7 @@
"parameters": {
"uri": {
"runtimeValue": {
"constantValue": {
"stringValue": "gs://ml-pipeline-playground/shakespeare1.txt"
}
"constant": "gs://ml-pipeline-playground/shakespeare1.txt"
}
}
}
Expand All @@ -392,48 +276,6 @@
"name": "importer"
}
},
"importer-3": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-importer-3"
},
"dependentTasks": [
"pass-through-op"
],
"inputs": {
"parameters": {
"uri": {
"taskOutputParameter": {
"outputParameterKey": "Output",
"producerTask": "pass-through-op"
}
}
}
},
"taskInfo": {
"name": "importer-3"
}
},
"pass-through-op": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-pass-through-op"
},
"inputs": {
"parameters": {
"value": {
"componentInputParameter": "dataset2"
}
}
},
"taskInfo": {
"name": "pass-through-op"
}
},
"train": {
"cachingOptions": {
"enableCache": true
Expand All @@ -457,30 +299,6 @@
"taskInfo": {
"name": "train"
}
},
"train-3": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-train-3"
},
"dependentTasks": [
"importer-3"
],
"inputs": {
"artifacts": {
"dataset": {
"taskOutputArtifact": {
"outputArtifactKey": "artifact",
"producerTask": "importer-3"
}
}
}
},
"taskInfo": {
"name": "train-3"
}
}
}
},
Expand All @@ -493,7 +311,7 @@
}
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-1.8.6"
"sdkVersion": "kfp-1.8.7"
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ def my_pipeline(dataset2: str = 'gs://ml-pipeline-playground/shakespeare2.txt'):
artifact_uri=dataset2, artifact_class=Dataset, reimport=True)
train(dataset=importer2.output)

importer3 = importer(
artifact_uri=pass_through_op(dataset2).output, artifact_class=Dataset)
train(dataset=importer3.output)


if __name__ == '__main__':
compiler.Compiler().compile(
Expand Down

0 comments on commit 51cc2a5

Please sign in to comment.