Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to KfpV2 #477

Merged
merged 34 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
17451c5
Feature/vertex compiler (#411)
GeorgesLorre Sep 11, 2023
52102a5
Add vertex runner (#429)
GeorgesLorre Sep 15, 2023
a0ea8e4
Add hardware configs (#433)
PhilippeMoussalli Sep 15, 2023
65e4553
Fix v2 defaults (#436)
PhilippeMoussalli Sep 18, 2023
03b1c3b
Make ComponentSpec the base for arg building
GeorgesLorre Sep 19, 2023
6faf90c
Make ComponentSpec the base for arg building
GeorgesLorre Sep 19, 2023
99016d7
Feature/no artifacts (#444)
GeorgesLorre Sep 21, 2023
12b74ae
Add more default/optional argument logic
GeorgesLorre Sep 21, 2023
32a8c04
Add cluser_type to default args
GeorgesLorre Oct 2, 2023
e49867b
Fix ruff error
GeorgesLorre Oct 2, 2023
cf179f1
Fix isOptional and defaultValue conversion
RobbeSneyders Oct 9, 2023
55fc4fe
Update runner to use KfP v2 API
RobbeSneyders Oct 9, 2023
fa14ea0
Change input_partition_rows to accept -1 as default
RobbeSneyders Oct 9, 2023
cd82bae
Update load_from_hf_hub defaults
RobbeSneyders Oct 9, 2023
987054b
Update download_images defaults
RobbeSneyders Oct 9, 2023
44a7d4d
Merge branch 'main' into feature/kfp-v2
RobbeSneyders Oct 10, 2023
74f7099
Merge branch 'main' into feature/kfp-v2
PhilippeMoussalli Oct 10, 2023
347eec4
re-enable cache
PhilippeMoussalli Oct 10, 2023
c89998c
Fix tests
RobbeSneyders Oct 10, 2023
1815d02
Update datacomp pipeline
RobbeSneyders Oct 10, 2023
081422e
Merge remote-tracking branch 'origin/feature/kfp-v2' into feature/kfp-v2
RobbeSneyders Oct 10, 2023
931df56
Remove python version upper bound
RobbeSneyders Oct 10, 2023
4342aa8
Re-add test suite for Python 3.11
RobbeSneyders Oct 10, 2023
a789c03
Add Python 3.12 upper bound
RobbeSneyders Oct 10, 2023
2108894
Add gcp dependencies to vertex extra
RobbeSneyders Oct 10, 2023
b778f5e
Address PR comments
RobbeSneyders Oct 10, 2023
c7e3a9f
Add Python 3.12 trove classifier to pyproject.toml
RobbeSneyders Oct 10, 2023
c6601eb
Lower python upper bound to 3.11 again to prevent slow dependency res…
RobbeSneyders Oct 10, 2023
d32dffc
Remove 3.11 test suite
RobbeSneyders Oct 10, 2023
fef720e
Address PR comments
RobbeSneyders Oct 10, 2023
3cb3a27
Changes based on self-review
RobbeSneyders Oct 10, 2023
35dad13
Update component defaults for kfpv2
RobbeSneyders Oct 10, 2023
c7ab7d5
Fix tests for kfp 2.3.0
RobbeSneyders Oct 10, 2023
0c6d403
disable kfpv2 default caching
PhilippeMoussalli Oct 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
python-version: ['3.8', '3.9', '3.10']
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
16 changes: 8 additions & 8 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ repos:
- types-requests
pass_filenames: false

- repo: local
hooks:
- id: generate_component_readmes
name: Generate component READMEs
language: python
entry: python scripts/component_readme/generate_readme.py
files: ^components/.*/fondant_component.yaml
additional_dependencies: ["fondant"]
# - repo: local
# hooks:
# - id: generate_component_readmes
# name: Generate component READMEs
# language: python
# entry: python scripts/component_readme/generate_readme.py
# files: ^components/.*/fondant_component.yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to deactivate this since it installs the latest version of fondant on PyPI, which breaks on the changes we made to the component spec schema. We can re-enable this later.

# additional_dependencies: ["fondant"]
4 changes: 2 additions & 2 deletions components/download_images/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ args:
resize_only_if_bigger:
description: If True, resize only if image is bigger than image_size.
type: bool
default: 'False'
default: False
min_image_size:
description: Minimum size of the images.
type: int
default: 0
max_aspect_ratio:
description: Maximum aspect ratio of the images.
type: float
default: 'inf'
default: 99.9
4 changes: 2 additions & 2 deletions components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ args:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: None
default: []
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to later on revert back to None and handle the conversion internally? It could be something like setting an arbitrary value if it's None in the fondant component and parsing it back to None again when the component is run

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's such a bad thing that the default needs to match the argument type. We would also need an arbitrary value for each type to make this work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we would assign a value as an optional argument if the default is set to None. Now the notion of optional arguments is missing and optional arguments are checked implicitly against arbitrary values defined in the default which can be a bit confusing.

Was more thinking from the perspective of the end user. So if they would assign a default to None, we would handle it as optional and return it as None to the end user. Implementing this is still not very clear to me but we could just assign arbitrary known values. For example, if the users sets an list argument to None, we would pass it as [] and the convert it back to None during parsing

Expand Down
11 changes: 5 additions & 6 deletions components/load_from_hf_hub/src/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
Expand All @@ -20,9 +19,9 @@ def __init__(self,
*_,
dataset_name: str,
column_name_mapping: dict,
image_column_names: t.Optional[list],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
image_column_names: list,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep them as optional since they have a default or does optional only apply when the default is None by convention?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional means that a None value can be passed in or set as a default. This is no longer the case now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should have optional argument (see comment above)

n_rows_to_load: int,
index_column: str,
) -> None:
"""
Args:
Expand Down Expand Up @@ -60,7 +59,7 @@ def load(self) -> dd.DataFrame:
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
if self.n_rows_to_load is not None:
if self.n_rows_to_load > 0:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
Expand All @@ -73,7 +72,7 @@ def load(self) -> dd.DataFrame:
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)

# 4) Set the index
if self.index_column is None:
if self.index_column == "None":
logger.info(
"Index column not specified, setting a globally unique index",
)
Expand Down
4 changes: 2 additions & 2 deletions components/load_from_parquet/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ args:
column_name_mapping:
description: Mapping of the consumed dataset
type: dict
default: None
default: {}
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
8 changes: 4 additions & 4 deletions components/load_from_parquet/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def __init__(self,
spec: ComponentSpec,
*_,
dataset_uri: str,
column_name_mapping: t.Optional[dict],
n_rows_to_load: t.Optional[int],
column_name_mapping: dict,
n_rows_to_load: int,
index_column: t.Optional[str],
) -> None:
"""
Expand All @@ -45,12 +45,12 @@ def load(self) -> dd.DataFrame:
dask_df = dd.read_parquet(self.dataset_uri)

# 2) Rename columns
if self.column_name_mapping is not None:
if self.column_name_mapping:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 3) Optional: only return specific amount of rows
if self.n_rows_to_load is not None:
if self.n_rows_to_load > 0:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
Expand Down
2 changes: 1 addition & 1 deletion components/segment_images/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ args:
batch_size:
description: batch size to use
type: int
batch_size: 8
default: 8
4 changes: 2 additions & 2 deletions components/write_to_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ args:
image_column_names:
description: A list containing the image column names. Used to format to image to HF hub format
type: list
default: None
default: []
column_name_mapping:
description: Mapping of the consumed fondant column names to the written hub column names
type: dict
default: None
default: {}
6 changes: 3 additions & 3 deletions components/write_to_hf_hub/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def __init__(self,
hf_token: str,
username: str,
dataset_name: str,
image_column_names: t.Optional[list],
column_name_mapping: t.Optional[dict],
image_column_names: list,
column_name_mapping: dict,
):
"""
Args:
Expand Down Expand Up @@ -87,7 +87,7 @@ def write(
# Map image column to hf data format
feature_encoder = datasets.Image(decode=True)

if self.image_column_names is not None:
if self.image_column_names:
for image_column_name in self.image_column_names:
dataframe[image_column_name] = dataframe[image_column_name].map(
lambda x: convert_bytes_to_image(x, feature_encoder),
Expand Down
3 changes: 0 additions & 3 deletions docs/components/component_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ The `args` section describes which arguments the component takes. Each argument
`description` and a `type`, which should be one of the builtin Python types. Additionally, you can
set an optional `default` value for each argument.

_Note:_ default iterable arguments such as `dict` and `list` have to be passed as a string
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved
(e.g. `'{"foo":1, "bar":2}`, `'["foo","bar]'`)

```yaml
args:
custom_argument:
Expand Down
3 changes: 2 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ def build_pipeline():
"batch_size": 2,
"max_new_tokens": 50,
},
number_of_gpus=1,
number_of_accelerators=1,
accelerator_name="GPU",
node_pool_label="node_pool",
node_pool_name="model-inference-pool",
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ args:
extract_plain_text:
description: If set to true the data contains the plain text without html tags
type: bool
default: "False"
default: False
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ args:
n_records_to_download:
description: Number of records to download
type: int
default: None
default: -1
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(
self,
*_,
common_crawl_indices: t.List[str],
n_records_to_download: t.Optional[int] = None,
n_records_to_download: int,
):
self.index_urls = [
self.build_index_url(index_name) for index_name in common_crawl_indices
Expand All @@ -38,7 +38,7 @@ def load(self) -> dd.DataFrame:
warc_urls.extend([line.decode() for line in extracted.split(b"\n")])

df = pd.DataFrame(warc_urls, columns=["warc_url"])
if self.n_records_to_download is not None:
if self.n_records_to_download > 0:
df = df.head(self.n_records_to_download)

return dd.from_pandas(df, npartitions=len(df) // 100)
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
default: -1
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def load(self) -> dd.DataFrame:

pandas_df = pd.DataFrame(prompts, columns=["prompts_text"])

if self.n_rows_to_load:
if self.n_rows_to_load > 0:
pandas_df = pandas_df.head(self.n_rows_to_load)

df = dd.from_pandas(pandas_df, npartitions=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ args:
image_column_names:
description: A list containing the image column names. Used to format to image to HF hub format
type: list
default: None
default: []
column_name_mapping:
description: Mapping of the consumed fondant column names to the written hub column names
type: dict
default: None
default: {}
6 changes: 4 additions & 2 deletions examples/pipelines/controlnet-interior-design/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@
"batch_size": 2,
"max_new_tokens": 50,
},
number_of_gpus=1,
number_of_accelerators=1,
accelerator_name="GPU",
)
segment_images_op = ComponentOp.from_registry(
name="segment_images",
arguments={
"model_id": "openmmlab/upernet-convnext-small",
"batch_size": 2,
},
number_of_gpus=1,
number_of_accelerators=1,
accelerator_name="GPU",
)

write_to_hub_controlnet = ComponentOp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ consumes:
items:
type: float32


produces:
imagetext:
fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ args:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: None
default: []
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Loading
Loading