Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify arg default #512

Merged
merged 13 commits into from
Oct 14, 2023
Merged
4 changes: 2 additions & 2 deletions components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ args:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: []
default: None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for a list, [] makes more sense than None.

Also in the other components.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I agree, then by definition it's either:

  • a normal argument with [] as default and no longer optional
  • Optional argument with [] as default which is not really common since Optional types don't really mix well with mutable data types
    image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • a normal argument with [] as default and no longer optional

If a default is defined, it is optional. It's just not KFP's isOptional. I don't see any issue with this.

  • Optional argument with [] as default which is not really common since Optional types don't really mix well with mutable data types
    image

In Python this is an issue indeed, however we don't need to define the default in Python. We can just define the default in the fondant_component.yaml:

fondant_component.yaml

    default: []

main.py

    image_column_names: list,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're just trying to indicate the absence of a value which is what None is better suited for. The empty list in this case won't be used for appending or modifying a certain behavior. Is there any added advantage compared to None?

It also seems like we're making an arbitrary choice on which data types to define as having empty values. Should we also include dictionaries?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage is that you don't need to handle the None case in the code, and can always assume that the value is the of the type defined in the argument.

I would indeed include dictionaries as well. I don't think that's arbitrary.

This is btw just component implementation. Fondant supports None for list and dict as well. I just don't think we need to use it 😛

n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
16 changes: 10 additions & 6 deletions components/load_from_hf_hub/src/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
Expand All @@ -19,9 +20,9 @@ def __init__(self,
*_,
dataset_name: str,
column_name_mapping: dict,
image_column_names: list,
n_rows_to_load: int,
index_column: str,
image_column_names: t.Optional[list],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Args:
Expand Down Expand Up @@ -57,6 +58,9 @@ def load(self) -> dd.DataFrame:
(subset_field_name, subset_field_name)
columns.append(column_name)

if self.index_column is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to keep the index column that will be dropped later on (if it exists). We weren't picking up on it since it was not in the component spec or remapping dict

columns.append(self.index_column)

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns)

Expand All @@ -72,7 +76,7 @@ def load(self) -> dd.DataFrame:
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
if self.n_rows_to_load > 0:
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
Expand All @@ -84,8 +88,8 @@ def load(self) -> dd.DataFrame:
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)

# 5) Set the index
if self.index_column == "None":
# 4) Set the index
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)
Expand Down
3 changes: 3 additions & 0 deletions components/load_from_parquet/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ def load(self) -> dd.DataFrame:
(subset_field_name, subset_field_name)
columns.append(column_name)

if self.index_column is not None:
columns.append(self.index_column)

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(self.dataset_uri, columns=columns)

Expand Down
6 changes: 3 additions & 3 deletions components/write_to_hf_hub/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def __init__(self,
hf_token: str,
username: str,
dataset_name: str,
image_column_names: list,
column_name_mapping: dict,
image_column_names: t.Optional[list],
column_name_mapping: t.Optional[dict],
):
"""
Args:
Expand Down Expand Up @@ -87,7 +87,7 @@ def write(
# Map image column to hf data format
feature_encoder = datasets.Image(decode=True)

if self.image_column_names:
if self.image_column_names is not None:
for image_column_name in self.image_column_names:
dataframe[image_column_name] = dataframe[image_column_name].map(
lambda x: convert_bytes_to_image(x, feature_encoder),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ args:
n_records_to_download:
description: Number of records to download
type: int
default: -1
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(
self,
*_,
common_crawl_indices: t.List[str],
n_records_to_download: int,
n_records_to_download: t.Optional[int] = None,
):
self.index_urls = [
self.build_index_url(index_name) for index_name in common_crawl_indices
Expand All @@ -38,7 +38,7 @@ def load(self) -> dd.DataFrame:
warc_urls.extend([line.decode() for line in extracted.split(b"\n")])

df = pd.DataFrame(warc_urls, columns=["warc_url"])
if self.n_records_to_download > 0:
if self.n_records_to_download is not None:
df = df.head(self.n_records_to_download)

return dd.from_pandas(df, npartitions=len(df) // 100)
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def load(self) -> dd.DataFrame:

pandas_df = pd.DataFrame(prompts, columns=["prompts_text"])

if self.n_rows_to_load > 0:
if self.n_rows_to_load:
pandas_df = pandas_df.head(self.n_rows_to_load)

df = dd.from_pandas(pandas_df, npartitions=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ args:
image_column_names:
description: A list containing the image column names. Used to format to image to HF hub format
type: list
default: []
default: None
column_name_mapping:
description: Mapping of the consumed fondant column names to the written hub column names
type: dict
default: {}
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ args:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: []
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ args:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
type: list
default: []
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ args:
image_column_names:
description: A list containing the image column names. Used to format to image to HF hub format
type: list
default: []
default: None
column_name_mapping:
description: Mapping of the consumed fondant column names to the written hub column names
type: dict
default: {}
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ args:
description: A list containing the original hub image column names. Used to format the image
from HF hub format to a byte string
type: list
default: []
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
11 changes: 6 additions & 5 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Argument:
name: str
description: str
type: str
default: t.Any = None
default: t.Optional[t.Any] = None
optional: t.Optional[bool] = False

@property
Expand All @@ -47,7 +47,8 @@ def python_type(self) -> t.Any:
"dict": json.loads,
"list": json.loads,
}
return lookup[self.type]
map_fn = lookup[self.type]
return lambda value: map_fn(value) if value != "None" else None # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? I see that we only use this to register the arguments. Setting the type as None feels strange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a bit confusing so i'll recap a bit:

  • In kubeflow optional types that default to None are defined as optional in the spec with no constant runtime value. See input3 here and here. They also should not be passed to the componentOp so that's why we remove them here
  • In docker, all arguments are passed as strings. Above we're defining a map function above a type that converts back to their original type. In case that value is a None string, we're converting it back to a Nonetype. We actually has this in the v1 implementation, not sure why it was removed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I was a bit confused by the notation, but looking at it again, it's clear to me why this is needed. Thanks!


@property
def kubeflow_type(self) -> str:
Expand Down Expand Up @@ -230,7 +231,7 @@ def default_arguments(self) -> t.Dict[str, Argument]:
description="The number of rows to load per partition. \
Set to override the automatic partitioning",
type="int",
default=-1,
optional=True,
),
"cache": Argument(
name="cache",
Expand Down Expand Up @@ -286,9 +287,9 @@ def convert_arguments(fondant_component: ComponentSpec):
for arg in fondant_component.args.values():
arg_type_dict = {}

if arg.optional or arg.default is not None:
if arg.optional and arg.default is None:
PhilippeMoussalli marked this conversation as resolved.
Show resolved Hide resolved
arg_type_dict["isOptional"] = True
if arg.default is not None:
if arg.default is not None and arg.default != "None":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Shouldn't this PR make sure that "None" is no longer needed, but None can be used instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, it's not needed here indeed since this should be triggered at compilation

arg_type_dict["defaultValue"] = arg.default

args[arg.name] = {
Expand Down
31 changes: 16 additions & 15 deletions src/fondant/data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(
*,
manifest: Manifest,
component_spec: ComponentSpec,
input_partition_rows: int = -1,
input_partition_rows: t.Optional[int],
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.input_partition_rows = input_partition_rows
Expand All @@ -38,7 +38,20 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
n_workers: int = os.cpu_count() # type: ignore

if self.input_partition_rows > 1:
if self.input_partition_rows is None:
n_partitions = dataframe.npartitions
if n_partitions < n_workers: # type: ignore
logger.info(
f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
dataframe = dataframe.repartition(npartitions=n_workers)
logger.info(
f"Repartitioning the data to {n_workers} partitions before processing"
f" to maximize worker usage",
)

elif self.input_partition_rows > 1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
elif self.input_partition_rows > 1:
elif self.input_partition_rows >= 1:

# Only load the index column to trigger a faster compute of the rows
total_rows = len(dataframe.index)
# +1 to handle any remainder rows
Expand All @@ -56,23 +69,11 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
f" all available workers {n_partitions} out of {n_workers} are used.",
)

elif self.input_partition_rows == -1:
n_partitions = dataframe.npartitions
if n_partitions < n_workers: # type: ignore
logger.info(
f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
dataframe = dataframe.repartition(npartitions=n_workers)
logger.info(
f"Repartitioning the data to {n_workers} partitions before processing"
f" to maximize worker usage",
)
else:
msg = (
f"{self.input_partition_rows} is not a valid value for the 'input_partition_rows' "
f"parameter. It should be a number larger than 0 to indicate the number of "
f"expected rows per partition, or '-1' to let Fondant optimize the number of "
f"expected rows per partition, or None to let Fondant optimize the number of "
f"partitions based on the number of available workers."
)
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand All @@ -48,7 +47,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand All @@ -75,7 +73,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand All @@ -47,7 +46,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand All @@ -74,7 +72,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1.0
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand Down Expand Up @@ -52,7 +51,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1.0
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand Down Expand Up @@ -52,7 +51,6 @@ components:
isOptional: true
parameterType: STRING
input_partition_rows:
defaultValue: -1.0
isOptional: true
parameterType: NUMBER_INTEGER
metadata:
Expand Down
Loading