Skip to content

Commit

Permalink
Update compiler and pipeline tests to match new structure
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre committed Aug 8, 2023
1 parent f59b791 commit c12fd20
Show file tree
Hide file tree
Showing 8 changed files with 570 additions and 34 deletions.
9 changes: 6 additions & 3 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,10 @@ def _resolve_imports(self):

self.kfp = kfp
except ImportError:
msg = """You need to install kfp to use the Kubeflow compiler,\n
you can install it with `pip install --extras kfp`"""
raise ImportError(
"You need to install kfp to use the Kubeflow compiler, "
/ "you can install it with `pip install --extras kfp`",
msg,
)

def compile(
Expand All @@ -239,8 +240,10 @@ def compile(
output_path: the path where to save the Kubeflow pipeline spec
"""
self.pipeline = pipeline
self.pipeline.sort_graph()
self.pipeline._validate_pipeline_definition("{{workflow.name}}")
logger.info(f"Compiling {self.pipeline.name} to {output_path}")
wrapped_pipeline = self.kfp.dsl.pipeline(self.kf_pipeline) # type: ignore
wrapped_pipeline = (self.kfp.dsl.pipeline())(self.kf_pipeline) # type: ignore
self.kfp.compiler.Compiler().compile(wrapped_pipeline, output_path) # type: ignore
logger.info("Pipeline compiled successfully")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: kf-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22, pipelines.kubeflow.org/pipeline_compilation_time: '2023-01-01T00:00:00',
pipelines.kubeflow.org/pipeline_spec: '{"name": "Kf pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.22}
spec:
entrypoint: kf-pipeline
templates:
- name: first-component
container:
args: []
command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data,
--metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec,
'{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}},
"description": "This is an example component", "image": "example_component:latest",
"name": "First component", "produces": {"captions": {"fields": {"data":
{"type": "string"}}}, "images": {"fields": {"data": {"type": "binary"}}}}}',
--input_partition_rows, disable, --output_partition_size, disable, --storage_args,
a dummy string arg, --output_manifest_path, /tmp/outputs/output_manifest_path/data]
image: example_component:latest
inputs:
artifacts:
- name: input_manifest_path
path: /tmp/inputs/input_manifest_path/data
raw: {data: ''}
outputs:
artifacts:
- {name: first-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.22
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This
is an example component", "implementation": {"container": {"command": ["python3",
"main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"},
"--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue":
"component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"},
"--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args",
{"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath":
"output_manifest_path"}], "image": "example_component:latest"}}, "inputs":
[{"description": "Path to the input manifest", "name": "input_manifest_path",
"type": "String"}, {"description": "Metadata arguments containing the run
id and base path", "name": "metadata", "type": "String"}, {"default": "None",
"description": "The component specification as a dictionary", "name": "component_spec",
"type": "JsonObject"}, {"default": "None", "description": "The number of
rows to load per partition. Set to override the automatic partitioning",
"name": "input_partition_rows", "type": "String"}, {"default": "None", "description":
"The size of the output partition size, defaults to 250MB. Set to `disable`
to disable the automatic partitioning", "name": "output_partition_size",
"type": "String"}, {"description": "Storage arguments", "name": "storage_args",
"type": "String"}], "name": "First component", "outputs": [{"description":
"Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: '{"digest": "561ddfe38aa8378f4ea92b26ef6bdeb53b1e9b2fc3c0908800738c304fdca30a"}',
pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\":
{\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}},
\"description\": \"This is an example component\", \"image\": \"example_component:latest\",
\"name\": \"First component\", \"produces\": {\"captions\": {\"fields\":
{\"data\": {\"type\": \"string\"}}}, \"images\": {\"fields\": {\"data\":
{\"type\": \"binary\"}}}}}", "input_partition_rows": "disable", "metadata":
"{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}", "output_partition_size":
"disable", "storage_args": "a dummy string arg"}'}
- name: kf-pipeline
dag:
tasks:
- {name: first-component, template: first-component}
- name: second-component
template: second-component
dependencies: [first-component]
arguments:
artifacts:
- {name: first-component-output_manifest_path, from: '{{tasks.first-component.outputs.artifacts.first-component-output_manifest_path}}'}
- name: third-component
template: third-component
dependencies: [second-component]
arguments:
artifacts:
- {name: second-component-output_manifest_path, from: '{{tasks.second-component.outputs.artifacts.second-component-output_manifest_path}}'}
- name: second-component
container:
args: []
command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data,
--metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec,
'{"args": {"storage_args": {"description": "Storage arguments", "type": "str"}},
"consumes": {"images": {"fields": {"data": {"type": "binary"}}}}, "description":
"This is an example component", "image": "example_component:latest", "name":
"Second component", "produces": {"embeddings": {"fields": {"data": {"items":
{"type": "float32"}, "type": "array"}}}}}', --input_partition_rows, '10',
--output_partition_size, 30MB, --storage_args, a dummy string arg, --output_manifest_path,
/tmp/outputs/output_manifest_path/data]
image: example_component:latest
inputs:
artifacts:
- {name: first-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data}
outputs:
artifacts:
- {name: second-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.22
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This
is an example component", "implementation": {"container": {"command": ["python3",
"main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"},
"--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue":
"component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"},
"--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args",
{"inputValue": "storage_args"}, "--output_manifest_path", {"outputPath":
"output_manifest_path"}], "image": "example_component:latest"}}, "inputs":
[{"description": "Path to the input manifest", "name": "input_manifest_path",
"type": "String"}, {"description": "Metadata arguments containing the run
id and base path", "name": "metadata", "type": "String"}, {"default": "None",
"description": "The component specification as a dictionary", "name": "component_spec",
"type": "JsonObject"}, {"default": "None", "description": "The number of
rows to load per partition. Set to override the automatic partitioning",
"name": "input_partition_rows", "type": "String"}, {"default": "None", "description":
"The size of the output partition size, defaults to 250MB. Set to `disable`
to disable the automatic partitioning", "name": "output_partition_size",
"type": "String"}, {"description": "Storage arguments", "name": "storage_args",
"type": "String"}], "name": "Second component", "outputs": [{"description":
"Path to the output manifest", "name": "output_manifest_path", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: '{"digest": "b20d3957f48cd2540e594e8c9f2f1f67f5a299152522c61a71f697f5e40278c7"}',
pipelines.kubeflow.org/arguments.parameters: '{"component_spec": "{\"args\":
{\"storage_args\": {\"description\": \"Storage arguments\", \"type\": \"str\"}},
\"consumes\": {\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}},
\"description\": \"This is an example component\", \"image\": \"example_component:latest\",
\"name\": \"Second component\", \"produces\": {\"embeddings\": {\"fields\":
{\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}}}}",
"input_partition_rows": "10", "metadata": "{\"base_path\": \"/foo/bar\",
\"run_id\": \"{{workflow.name}}\"}", "output_partition_size": "30MB", "storage_args":
"a dummy string arg"}'}
- name: third-component
container:
args: []
command: [python3, main.py, --input_manifest_path, /tmp/inputs/input_manifest_path/data,
--metadata, '{"base_path": "/foo/bar", "run_id": "{{workflow.name}}"}', --component_spec,
'{"args": {"some_list": {"description": "Some list", "items": {"type": "int"},
"type": "list"}, "storage_args": {"description": "Storage arguments", "type":
"str"}}, "consumes": {"captions": {"fields": {"data": {"type": "string"}}},
"embeddings": {"fields": {"data": {"items": {"type": "float32"}, "type":
"array"}}}, "images": {"fields": {"data": {"type": "binary"}}}}, "description":
"This is an example component", "image": "example_component:latest", "name":
"Third component", "produces": {"additionalSubsets": false, "images": {"fields":
{"data": {"type": "binary"}}}}}', --input_partition_rows, None, --output_partition_size,
None, --storage_args, a dummy string arg, --some_list, '[1, 2, 3]', --output_manifest_path,
/tmp/outputs/output_manifest_path/data]
image: example_component:latest
inputs:
artifacts:
- {name: second-component-output_manifest_path, path: /tmp/inputs/input_manifest_path/data}
outputs:
artifacts:
- {name: third-component-output_manifest_path, path: /tmp/outputs/output_manifest_path/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.22
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "This
is an example component", "implementation": {"container": {"command": ["python3",
"main.py", "--input_manifest_path", {"inputPath": "input_manifest_path"},
"--metadata", {"inputValue": "metadata"}, "--component_spec", {"inputValue":
"component_spec"}, "--input_partition_rows", {"inputValue": "input_partition_rows"},
"--output_partition_size", {"inputValue": "output_partition_size"}, "--storage_args",
{"inputValue": "storage_args"}, "--some_list", {"inputValue": "some_list"},
"--output_manifest_path", {"outputPath": "output_manifest_path"}], "image":
"example_component:latest"}}, "inputs": [{"description": "Path to the input
manifest", "name": "input_manifest_path", "type": "String"}, {"description":
"Metadata arguments containing the run id and base path", "name": "metadata",
"type": "String"}, {"default": "None", "description": "The component specification
as a dictionary", "name": "component_spec", "type": "JsonObject"}, {"default":
"None", "description": "The number of rows to load per partition. Set to
override the automatic partitioning", "name": "input_partition_rows", "type":
"String"}, {"default": "None", "description": "The size of the output partition
size, defaults to 250MB. Set to `disable` to disable the automatic partitioning",
"name": "output_partition_size", "type": "String"}, {"description": "Storage
arguments", "name": "storage_args", "type": "String"}, {"description": "Some
list", "name": "some_list", "type": "JsonArray"}], "name": "Third component",
"outputs": [{"description": "Path to the output manifest", "name": "output_manifest_path",
"type": "String"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"936f0e13275cc8aab199925252dffe2720a01d94af50e5aa78bf9819ccb4ab27"}', pipelines.kubeflow.org/arguments.parameters: '{"component_spec":
"{\"args\": {\"some_list\": {\"description\": \"Some list\", \"items\":
{\"type\": \"int\"}, \"type\": \"list\"}, \"storage_args\": {\"description\":
\"Storage arguments\", \"type\": \"str\"}}, \"consumes\": {\"captions\":
{\"fields\": {\"data\": {\"type\": \"string\"}}}, \"embeddings\": {\"fields\":
{\"data\": {\"items\": {\"type\": \"float32\"}, \"type\": \"array\"}}},
\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}, \"description\":
\"This is an example component\", \"image\": \"example_component:latest\",
\"name\": \"Third component\", \"produces\": {\"additionalSubsets\": false,
\"images\": {\"fields\": {\"data\": {\"type\": \"binary\"}}}}}", "input_partition_rows":
"None", "metadata": "{\"base_path\": \"/foo/bar\", \"run_id\": \"{{workflow.name}}\"}",
"output_partition_size": "None", "some_list": "[1, 2, 3]", "storage_args":
"a dummy string arg"}'}
arguments:
parameters: []
serviceAccountName: pipeline-runner
Loading

0 comments on commit c12fd20

Please sign in to comment.