Skip to content

Commit

Permalink
Create simple storage writer (#826)
Browse files Browse the repository at this point in the history
Write component that writes a dask dataframe to a file, either csv or
parquet.

Fix #824

---------

Co-authored-by: Philippe Moussalli <[email protected]>
  • Loading branch information
mrchtr and PhilippeMoussalli authored Feb 2, 2024
1 parent 2e912e0 commit c4cd3d3
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 2 deletions.
28 changes: 28 additions & 0 deletions components/write_to_file/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM --platform=linux/amd64 python:3.10-slim as base

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component
COPY src/ src/

FROM base as test
COPY tests/ tests/
RUN pip3 install --no-cache-dir -r tests/requirements.txt
RUN python -m pytest tests

FROM base
WORKDIR /component/src
ENTRYPOINT ["fondant", "execute", "main"]
75 changes: 75 additions & 0 deletions components/write_to_file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Write to file

<a id="write_to_file#description"></a>
## Description
A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket. The dataset can be written as csv or parquet.

<a id="write_to_file#inputs_outputs"></a>
## Inputs / outputs

<a id="write_to_file#consumes"></a>
### Consumes

**This component can consume additional fields**
- <field_name>: <dataset_field_name>
This defines a mapping to update the fields consumed by the operation as defined in the component spec.
The keys are the names of the fields to be received by the component, while the values are
the name of the field to map from the input dataset

See the usage example below on how to define a field name for additional fields.




<a id="write_to_file#produces"></a>
### Produces


**This component does not produce data.**

<a id="write_to_file#arguments"></a>
## Arguments

The component takes the following arguments to alter its behavior:

| argument | type | description | default |
| -------- | ---- | ----------- | ------- |
| path | str | Path to store the dataset, whether it's a local path or a cloud storage bucket, must be specified. A separate filename will be generated for each partition. If you are using the local runner and export the data to a local directory, ensure that you mount the path to the directory using the `--extra-volumes` argument. | / |
| format | str | Format for storing the dataframe can be either `csv` or `parquet`. As default `parquet` is used. The CSV files contain the column as a header and use a comma as a delimiter. | parquet |

<a id="write_to_file#usage"></a>
## Usage

You can add this component to your pipeline using the following code:

```python
from fondant.pipeline import Pipeline


pipeline = Pipeline(...)

dataset = pipeline.read(...)

dataset = dataset.apply(...)

dataset.write(
"write_to_file",
arguments={
# Add arguments
# "path": ,
# "format": "parquet",
},
consumes={
<field_name>: <dataset_field_name>,
..., # Add fields
},
)
```

<a id="write_to_file#testing"></a>
## Testing

You can run the tests using docker with BuildKit. From this directory, run:
```
docker build . --target test
```
26 changes: 26 additions & 0 deletions components/write_to_file/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Write to file
description: >-
A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket.
The dataset can be written as csv or parquet.
image: 'fndnt/write_to_file:dev'
tags:
- Data writing

consumes:
additionalProperties: true

args:
path:
description: >-
Path to store the dataset, whether it's a local path or a cloud storage bucket,
must be specified. A separate filename will be generated for each partition.
If you are using the local runner and export the data to a local directory,
ensure that you mount the path to the directory using the `--extra-volumes` argument.
type: str
format:
description: >-
Format for storing the dataframe can be either `csv` or `parquet`. As default
`parquet` is used.
The CSV files contain the column as a header and use a comma as a delimiter.
type: str
default: parquet
Empty file.
29 changes: 29 additions & 0 deletions components/write_to_file/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import dask.dataframe as dd
from fondant.component import DaskWriteComponent


class WriteToFile(DaskWriteComponent):
def __init__(self, *, path: str, format: str):
"""Initialize the write to file component."""
self.path = path
self.format = format

def write(self, dataframe: dd.DataFrame) -> None:
"""
Writes the data from the given Dask DataFrame to a file either locally or
to a remote storage bucket.
Args:
dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written.
"""
if self.format.lower() == "csv":
self.path = self.path + "/export-*.csv"
dataframe.to_csv(self.path)
elif self.format.lower() == "parquet":
dataframe.to_parquet(self.path)
else:
msg = (
f"Not supported file format {self.format}. Writing to file is only "
f"supported for `csv` and `parquet`."
)
raise ValueError(msg)
2 changes: 2 additions & 0 deletions components/write_to_file/tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
pythonpath = ../src
1 change: 1 addition & 0 deletions components/write_to_file/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest==7.4.2
58 changes: 58 additions & 0 deletions components/write_to_file/tests/write_to_file_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import tempfile

import dask.dataframe as dd
import pandas as pd

from src.main import WriteToFile


def test_write_to_csv():
"""Test case for write to file component."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = 10

dask_dataframe = dd.DataFrame.from_dict(
{
"text": [
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo",
]
* entries,
},
npartitions=1,
)

component = WriteToFile(
path=tmpdir,
format="csv",
)

component.write(dask_dataframe)

df = pd.read_csv(tmpdir + "/export-0.csv")
assert len(df) == entries


def test_write_to_parquet():
"""Test case for write to file component."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = 10

dask_dataframe = dd.DataFrame.from_dict(
{
"text": [
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo",
]
* entries,
},
npartitions=1,
)

component = WriteToFile(
path=tmpdir,
format="parquet",
)

component.write(dask_dataframe)

ddf = dd.read_parquet(tmpdir)
assert len(ddf) == entries
5 changes: 4 additions & 1 deletion examples/sample_pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ returns the received dataframe.
The pipeline can be executed with the Fondant cli:

```bash
fondant run local pipeline.py
fondant run local pipeline.py --extra-volumes ./data:/data
```

> We are mounting the data folder using `--extra-volumes` to ensure that the component
> can load the sample data and write to the folder.
The automated integration test will use the `run.sh` script.
4 changes: 3 additions & 1 deletion examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
return dataframe


_ = dataset.apply(
dataset = dataset.apply(
ref=CalculateChunkLength,
produces={"chunk_length": pa.int32()},
arguments={"arg_x": "value_x"},
)

dataset.write(ref="write_to_file", arguments={"path": "/data/export"})

0 comments on commit c4cd3d3

Please sign in to comment.