-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modify arg default #512
Modify arg default #512
Changes from 7 commits
1580075
962cafa
8366062
ce7d271
53c5a75
95eabef
4449db4
73071c8
ea6f05e
8ba1196
1408232
e9df256
3c6fef4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
"""This component loads a seed dataset from the hub.""" | ||
import logging | ||
import typing as t | ||
|
||
import dask | ||
import dask.dataframe as dd | ||
|
@@ -19,9 +20,9 @@ def __init__(self, | |
*_, | ||
dataset_name: str, | ||
column_name_mapping: dict, | ||
image_column_names: list, | ||
n_rows_to_load: int, | ||
index_column: str, | ||
image_column_names: t.Optional[list], | ||
n_rows_to_load: t.Optional[int], | ||
index_column: t.Optional[str], | ||
) -> None: | ||
""" | ||
Args: | ||
|
@@ -57,6 +58,9 @@ def load(self) -> dd.DataFrame: | |
(subset_field_name, subset_field_name) | ||
columns.append(column_name) | ||
|
||
if self.index_column is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed to keep the index column that will be dropped later on (if it exists). We weren't picking up on it since it was not in the component spec or remapping dict |
||
columns.append(self.index_column) | ||
|
||
logger.debug(f"Columns to keep: {columns}") | ||
dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns) | ||
|
||
|
@@ -72,7 +76,7 @@ def load(self) -> dd.DataFrame: | |
dask_df = dask_df.rename(columns=self.column_name_mapping) | ||
|
||
# 4) Optional: only return specific amount of rows | ||
if self.n_rows_to_load > 0: | ||
if self.n_rows_to_load is not None: | ||
partitions_length = 0 | ||
npartitions = 1 | ||
for npartitions, partition in enumerate(dask_df.partitions, start=1): | ||
|
@@ -84,8 +88,8 @@ def load(self) -> dd.DataFrame: | |
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions) | ||
dask_df = dd.from_pandas(dask_df, npartitions=npartitions) | ||
|
||
# 5) Set the index | ||
if self.index_column == "None": | ||
# 4) Set the index | ||
if self.index_column is None: | ||
logger.info( | ||
"Index column not specified, setting a globally unique index", | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,4 +15,4 @@ args: | |
n_records_to_download: | ||
description: Number of records to download | ||
type: int | ||
default: -1 | ||
default: None |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,7 @@ class Argument: | |
name: str | ||
description: str | ||
type: str | ||
default: t.Any = None | ||
default: t.Optional[t.Any] = None | ||
optional: t.Optional[bool] = False | ||
|
||
@property | ||
|
@@ -47,7 +47,8 @@ def python_type(self) -> t.Any: | |
"dict": json.loads, | ||
"list": json.loads, | ||
} | ||
return lookup[self.type] | ||
map_fn = lookup[self.type] | ||
return lambda value: map_fn(value) if value != "None" else None # type: ignore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed? I see that we only use this to register the arguments. Setting the type as None feels strange. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be a bit confusing so i'll recap a bit:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I was a bit confused by the notation, but looking at it again, it's clear to me why this is needed. Thanks! |
||
|
||
@property | ||
def kubeflow_type(self) -> str: | ||
|
@@ -230,7 +231,7 @@ def default_arguments(self) -> t.Dict[str, Argument]: | |
description="The number of rows to load per partition. \ | ||
Set to override the automatic partitioning", | ||
type="int", | ||
default=-1, | ||
optional=True, | ||
), | ||
"cache": Argument( | ||
name="cache", | ||
|
@@ -286,9 +287,9 @@ def convert_arguments(fondant_component: ComponentSpec): | |
for arg in fondant_component.args.values(): | ||
arg_type_dict = {} | ||
|
||
if arg.optional or arg.default is not None: | ||
if arg.optional and arg.default is None: | ||
PhilippeMoussalli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
arg_type_dict["isOptional"] = True | ||
if arg.default is not None: | ||
if arg.default is not None and arg.default != "None": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this? Shouldn't this PR make sure that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, it's not needed here indeed since this should be triggered at compilation |
||
arg_type_dict["defaultValue"] = arg.default | ||
|
||
args[arg.name] = { | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -24,7 +24,7 @@ def __init__( | |||||
*, | ||||||
manifest: Manifest, | ||||||
component_spec: ComponentSpec, | ||||||
input_partition_rows: int = -1, | ||||||
input_partition_rows: t.Optional[int] = None, | ||||||
): | ||||||
super().__init__(manifest=manifest, component_spec=component_spec) | ||||||
self.input_partition_rows = input_partition_rows | ||||||
|
@@ -38,7 +38,20 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: | |||||
""" | ||||||
n_workers: int = os.cpu_count() # type: ignore | ||||||
|
||||||
if self.input_partition_rows > 1: | ||||||
if self.input_partition_rows is None: | ||||||
n_partitions = dataframe.npartitions | ||||||
if n_partitions < n_workers: # type: ignore | ||||||
logger.info( | ||||||
f"The number of partitions of the input dataframe is {n_partitions}. The " | ||||||
f"available number of workers is {n_workers}.", | ||||||
) | ||||||
dataframe = dataframe.repartition(npartitions=n_workers) | ||||||
logger.info( | ||||||
f"Repartitioning the data to {n_workers} partitions before processing" | ||||||
f" to maximize worker usage", | ||||||
) | ||||||
|
||||||
elif self.input_partition_rows > 1: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
# Only load the index column to trigger a faster compute of the rows | ||||||
total_rows = len(dataframe.index) | ||||||
# +1 to handle any remainder rows | ||||||
|
@@ -56,23 +69,11 @@ def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame: | |||||
f" all available workers {n_partitions} out of {n_workers} are used.", | ||||||
) | ||||||
|
||||||
elif self.input_partition_rows == -1: | ||||||
n_partitions = dataframe.npartitions | ||||||
if n_partitions < n_workers: # type: ignore | ||||||
logger.info( | ||||||
f"The number of partitions of the input dataframe is {n_partitions}. The " | ||||||
f"available number of workers is {n_workers}.", | ||||||
) | ||||||
dataframe = dataframe.repartition(npartitions=n_workers) | ||||||
logger.info( | ||||||
f"Repartitioning the data to {n_workers} partitions before processing" | ||||||
f" to maximize worker usage", | ||||||
) | ||||||
else: | ||||||
msg = ( | ||||||
f"{self.input_partition_rows} is not a valid value for the 'input_partition_rows' " | ||||||
f"parameter. It should be a number larger than 0 to indicate the number of " | ||||||
f"expected rows per partition, or '-1' to let Fondant optimize the number of " | ||||||
f"expected rows per partition, or None to let Fondant optimize the number of " | ||||||
f"partitions based on the number of available workers." | ||||||
) | ||||||
raise ValueError( | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for a list,
[]
makes more sense thanNone
.Also in the other components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I agree, then by definition it's either:
Optional
types don't really mix well with mutable data typesThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a default is defined, it is optional. It's just not KFP's
isOptional
. I don't see any issue with this.In Python this is an issue indeed, however we don't need to define the default in Python. We can just define the default in the
fondant_component.yaml
:fondant_component.yaml
main.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're just trying to indicate the absence of a value which is what
None
is better suited for. The empty list in this case won't be used for appending or modifying a certain behavior. Is there any added advantage compared toNone
?It also seems like we're making an arbitrary choice on which data types to define as having empty values. Should we also include dictionaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The advantage is that you don't need to handle the
None
case in the code, and can always assume that the value is the of the type defined in the argument.I would indeed include dictionaries as well. I don't think that's arbitrary.
This is btw just component implementation. Fondant supports
None
forlist
anddict
as well. I just don't think we need to use it 😛