diff --git a/components/write_to_file/Dockerfile b/components/write_to_file/Dockerfile new file mode 100644 index 00000000..78e49c7d --- /dev/null +++ b/components/write_to_file/Dockerfile @@ -0,0 +1,28 @@ +FROM --platform=linux/amd64 python:3.10-slim as base + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component +COPY src/ src/ + +FROM base as test +COPY tests/ tests/ +RUN pip3 install --no-cache-dir -r tests/requirements.txt +RUN python -m pytest tests + +FROM base +WORKDIR /component/src +ENTRYPOINT ["fondant", "execute", "main"] diff --git a/components/write_to_file/README.md b/components/write_to_file/README.md new file mode 100644 index 00000000..9c4c26ab --- /dev/null +++ b/components/write_to_file/README.md @@ -0,0 +1,75 @@ +# Write to file + + +## Description +A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket. The dataset can be written as csv or parquet. + + +## Inputs / outputs + + +### Consumes + +**This component can consume additional fields** +- : +This defines a mapping to update the fields consumed by the operation as defined in the component spec. +The keys are the names of the fields to be received by the component, while the values are +the name of the field to map from the input dataset + +See the usage example below on how to define a field name for additional fields. + + + + + +### Produces + + +**This component does not produce data.** + + +## Arguments + +The component takes the following arguments to alter its behavior: + +| argument | type | description | default | +| -------- | ---- | ----------- | ------- | +| path | str | Path to store the dataset, whether it's a local path or a cloud storage bucket, must be specified. A separate filename will be generated for each partition. If you are using the local runner and export the data to a local directory, ensure that you mount the path to the directory using the `--extra-volumes` argument. | / | +| format | str | Format for storing the dataframe can be either `csv` or `parquet`. As default `parquet` is used. The CSV files contain the column as a header and use a comma as a delimiter. | parquet | + + +## Usage + +You can add this component to your pipeline using the following code: + +```python +from fondant.pipeline import Pipeline + + +pipeline = Pipeline(...) + +dataset = pipeline.read(...) + +dataset = dataset.apply(...) + +dataset.write( + "write_to_file", + arguments={ + # Add arguments + # "path": , + # "format": "parquet", + }, + consumes={ + : , + ..., # Add fields + }, +) +``` + + +## Testing + +You can run the tests using docker with BuildKit. From this directory, run: +``` +docker build . --target test +``` diff --git a/components/write_to_file/fondant_component.yaml b/components/write_to_file/fondant_component.yaml new file mode 100644 index 00000000..efbe96ca --- /dev/null +++ b/components/write_to_file/fondant_component.yaml @@ -0,0 +1,26 @@ +name: Write to file +description: >- + A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket. + The dataset can be written as csv or parquet. +image: 'fndnt/write_to_file:dev' +tags: + - Data writing + +consumes: + additionalProperties: true + +args: + path: + description: >- + Path to store the dataset, whether it's a local path or a cloud storage bucket, + must be specified. A separate filename will be generated for each partition. + If you are using the local runner and export the data to a local directory, + ensure that you mount the path to the directory using the `--extra-volumes` argument. + type: str + format: + description: >- + Format for storing the dataframe can be either `csv` or `parquet`. As default + `parquet` is used. + The CSV files contain the column as a header and use a comma as a delimiter. + type: str + default: parquet \ No newline at end of file diff --git a/components/write_to_file/requirements.txt b/components/write_to_file/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/components/write_to_file/src/main.py b/components/write_to_file/src/main.py new file mode 100644 index 00000000..34e45d5c --- /dev/null +++ b/components/write_to_file/src/main.py @@ -0,0 +1,29 @@ +import dask.dataframe as dd +from fondant.component import DaskWriteComponent + + +class WriteToFile(DaskWriteComponent): + def __init__(self, *, path: str, format: str): + """Initialize the write to file component.""" + self.path = path + self.format = format + + def write(self, dataframe: dd.DataFrame) -> None: + """ + Writes the data from the given Dask DataFrame to a file either locally or + to a remote storage bucket. + + Args: + dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written. + """ + if self.format.lower() == "csv": + self.path = self.path + "/export-*.csv" + dataframe.to_csv(self.path) + elif self.format.lower() == "parquet": + dataframe.to_parquet(self.path) + else: + msg = ( + f"Not supported file format {self.format}. Writing to file is only " + f"supported for `csv` and `parquet`." + ) + raise ValueError(msg) diff --git a/components/write_to_file/tests/pytest.ini b/components/write_to_file/tests/pytest.ini new file mode 100644 index 00000000..bf6a8a51 --- /dev/null +++ b/components/write_to_file/tests/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +pythonpath = ../src \ No newline at end of file diff --git a/components/write_to_file/tests/requirements.txt b/components/write_to_file/tests/requirements.txt new file mode 100644 index 00000000..2a929edc --- /dev/null +++ b/components/write_to_file/tests/requirements.txt @@ -0,0 +1 @@ +pytest==7.4.2 diff --git a/components/write_to_file/tests/write_to_file_test.py b/components/write_to_file/tests/write_to_file_test.py new file mode 100644 index 00000000..ebd8ad92 --- /dev/null +++ b/components/write_to_file/tests/write_to_file_test.py @@ -0,0 +1,58 @@ +import tempfile + +import dask.dataframe as dd +import pandas as pd + +from src.main import WriteToFile + + +def test_write_to_csv(): + """Test case for write to file component.""" + with tempfile.TemporaryDirectory() as tmpdir: + entries = 10 + + dask_dataframe = dd.DataFrame.from_dict( + { + "text": [ + "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo", + ] + * entries, + }, + npartitions=1, + ) + + component = WriteToFile( + path=tmpdir, + format="csv", + ) + + component.write(dask_dataframe) + + df = pd.read_csv(tmpdir + "/export-0.csv") + assert len(df) == entries + + +def test_write_to_parquet(): + """Test case for write to file component.""" + with tempfile.TemporaryDirectory() as tmpdir: + entries = 10 + + dask_dataframe = dd.DataFrame.from_dict( + { + "text": [ + "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo", + ] + * entries, + }, + npartitions=1, + ) + + component = WriteToFile( + path=tmpdir, + format="parquet", + ) + + component.write(dask_dataframe) + + ddf = dd.read_parquet(tmpdir) + assert len(ddf) == entries diff --git a/examples/sample_pipeline/README.md b/examples/sample_pipeline/README.md index 6ab76c9e..d8960dc9 100644 --- a/examples/sample_pipeline/README.md +++ b/examples/sample_pipeline/README.md @@ -7,7 +7,10 @@ returns the received dataframe. The pipeline can be executed with the Fondant cli: ```bash -fondant run local pipeline.py +fondant run local pipeline.py --extra-volumes ./data:/data ``` +> We are mounting the data folder using `--extra-volumes` to ensure that the component +> can load the sample data and write to the folder. + The automated integration test will use the `run.sh` script. \ No newline at end of file diff --git a/examples/sample_pipeline/pipeline.py b/examples/sample_pipeline/pipeline.py index 875f9c58..f64024b1 100644 --- a/examples/sample_pipeline/pipeline.py +++ b/examples/sample_pipeline/pipeline.py @@ -57,8 +57,10 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: return dataframe -_ = dataset.apply( +dataset = dataset.apply( ref=CalculateChunkLength, produces={"chunk_length": pa.int32()}, arguments={"arg_x": "value_x"}, ) + +dataset.write(ref="write_to_file", arguments={"path": "/data/export"})