Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify arg default #512

Merged
merged 13 commits into from
Oct 14, 2023
Merged
3 changes: 2 additions & 1 deletion components/load_from_hf_hub/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ args:
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
default: {}
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
Expand All @@ -23,7 +24,7 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
113 changes: 72 additions & 41 deletions components/load_from_hf_hub/src/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
Expand All @@ -18,10 +19,10 @@ def __init__(
spec: ComponentSpec,
*_,
dataset_name: str,
column_name_mapping: dict,
image_column_names: list,
n_rows_to_load: int,
index_column: str,
column_name_mapping: t.Optional[dict],
image_column_names: t.Optional[list],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Args:
Expand All @@ -42,55 +43,45 @@ def __init__(
self.index_column = index_column
self.spec = spec

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")

def get_columns_to_keep(self) -> t.List[str]:
# Only read required columns
columns = []

invert_column_name_mapping = {v: k for k, v in self.column_name_mapping.items()}
if self.column_name_mapping:
invert_column_name_mapping = {
v: k for k, v in self.column_name_mapping.items()
}
else:
invert_column_name_mapping = {}

for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
subset_field_name = f"{subset_name}_{field_name}"
column_name = invert_column_name_mapping.get(
subset_field_name,
subset_field_name,
)
columns.append(column_name)

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns)

# 2) Make sure images are bytes instead of dicts
if self.image_column_names is not None:
column_name = f"{subset_name}_{field_name}"
if (
invert_column_name_mapping
and column_name in invert_column_name_mapping
):
columns.append(invert_column_name_mapping[column_name])
else:
columns.append(column_name)

if self.index_column is not None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to keep the index column that will be dropped later on (if it exists). We weren't picking up on it since it was not in the component spec or remapping dict

columns.append(self.index_column)

return columns

def convert_images_to_bytes(self, dask_df) -> dd.DataFrame:
if self.image_column_names:
for image_column_name in self.image_column_names:
dask_df[image_column_name] = dask_df[image_column_name].map(
lambda x: x["bytes"],
meta=("bytes", bytes),
)

# 3) Rename columns
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
if self.n_rows_to_load > 0:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return dask_df

# 5) Set the index
if self.index_column == "None":
def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)
Expand Down Expand Up @@ -122,3 +113,43 @@ def _get_meta_df() -> pd.DataFrame:
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return dask_df

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")

columns = self.get_columns_to_keep()

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(f"hf://datasets/{self.dataset_name}", columns=columns)

# 2) Make sure images are bytes instead of dicts
dask_df = self.convert_images_to_bytes(dask_df)

# 3) Rename columns
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
dask_df = self.return_subset_of_df(dask_df)

# 5) Set the index
dask_df = self.set_df_index(dask_df)

return dask_df
1 change: 0 additions & 1 deletion components/load_from_parquet/fondant_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
94 changes: 58 additions & 36 deletions components/load_from_parquet/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def __init__(
spec: ComponentSpec,
*_,
dataset_uri: str,
column_name_mapping: dict,
n_rows_to_load: int,
column_name_mapping: t.Optional[dict],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Expand All @@ -39,49 +39,34 @@ def __init__(
self.index_column = index_column
self.spec = spec

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the file...")

def get_columns_to_keep(self) -> t.List[str]:
# Only read required columns
columns = []
if self.column_name_mapping is not None:

if self.column_name_mapping:
invert_column_name_mapping = {
v: k for k, v in self.column_name_mapping.items()
}
for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
subset_field_name = f"{subset_name}_{field_name}"
column_name = invert_column_name_mapping.get(
subset_field_name,
subset_field_name,
)
columns.append(column_name)
else:
invert_column_name_mapping = {}

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(self.dataset_uri, columns=columns)
for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
column_name = f"{subset_name}_{field_name}"
if (
invert_column_name_mapping
and column_name in invert_column_name_mapping
):
columns.append(invert_column_name_mapping[column_name])
else:
columns.append(column_name)

# 2) Rename columns
if self.column_name_mapping:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)
if self.index_column is not None:
columns.append(self.index_column)

# 3) Optional: only return specific amount of rows
if self.n_rows_to_load > 0:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return columns

# 4) Set the index
def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
Expand Down Expand Up @@ -114,3 +99,40 @@ def _get_meta_df() -> pd.DataFrame:
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df

def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame:
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(
f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""",
)
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
return dask_df

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the hub...")

columns = self.get_columns_to_keep()

logger.debug(f"Columns to keep: {columns}")
dask_df = dd.read_parquet(self.dataset_uri, columns=columns)

# 2) Rename columns
if self.column_name_mapping:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 4) Optional: only return specific amount of rows
dask_df = self.return_subset_of_df(dask_df)

# 5) Set the index
dask_df = self.set_df_index(dask_df)
return dask_df
6 changes: 3 additions & 3 deletions components/write_to_hf_hub/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def __init__(
hf_token: str,
username: str,
dataset_name: str,
image_column_names: list,
column_name_mapping: dict,
image_column_names: t.Optional[list],
column_name_mapping: t.Optional[dict],
):
"""
Args:
Expand Down Expand Up @@ -91,7 +91,7 @@ def write(
# Map image column to hf data format
feature_encoder = datasets.Image(decode=True)

if self.image_column_names:
if self.image_column_names is not None:
for image_column_name in self.image_column_names:
dataframe[image_column_name] = dataframe[image_column_name].map(
lambda x: convert_bytes_to_image(x, feature_encoder),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ args:
n_records_to_download:
description: Number of records to download
type: int
default: -1
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(
self,
*_,
common_crawl_indices: t.List[str],
n_records_to_download: int,
n_records_to_download: t.Optional[int] = None,
):
self.index_urls = [
self.build_index_url(index_name) for index_name in common_crawl_indices
Expand All @@ -38,7 +38,7 @@ def load(self) -> dd.DataFrame:
warc_urls.extend([line.decode() for line in extracted.split(b"\n")])

df = pd.DataFrame(warc_urls, columns=["warc_url"])
if self.n_records_to_download > 0:
if self.n_records_to_download is not None:
df = df.head(self.n_records_to_download)

return dd.from_pandas(df, npartitions=len(df) // 100)
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def load(self) -> dd.DataFrame:

pandas_df = pd.DataFrame(prompts, columns=["prompts_text"])

if self.n_rows_to_load > 0:
if self.n_rows_to_load:
pandas_df = pandas_df.head(self.n_rows_to_load)

df = dd.from_pandas(pandas_df, npartitions=1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ args:
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
default: {}
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
Expand All @@ -54,7 +55,7 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ args:
column_name_mapping:
description: Mapping of the consumed hub dataset to fondant column names
type: dict
default: {}
image_column_names:
description: Optional argument, a list containing the original image column names in case the
dataset on the hub contains them. Used to format the image from HF hub format to a byte string.
Expand All @@ -35,7 +36,6 @@ args:
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: -1
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
Expand Down
Loading
Loading