Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create simple storage writer #826

Merged
merged 19 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions components/write_to_file/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM --platform=linux/amd64 python:3.10-slim as base

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component
COPY src/ src/

FROM base as test
COPY tests/ tests/
RUN pip3 install --no-cache-dir -r tests/requirements.txt
RUN python -m pytest tests

FROM base
WORKDIR /component/src
ENTRYPOINT ["fondant", "execute", "main"]
75 changes: 75 additions & 0 deletions components/write_to_file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Write to file

<a id="write_to_file#description"></a>
## Description
A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket. The dataset can be written as csv or parquet.

<a id="write_to_file#inputs_outputs"></a>
## Inputs / outputs

<a id="write_to_file#consumes"></a>
### Consumes

**This component can consume additional fields**
- <field_name>: <dataset_field_name>
This defines a mapping to update the fields consumed by the operation as defined in the component spec.
The keys are the names of the fields to be received by the component, while the values are
the name of the field to map from the input dataset

See the usage example below on how to define a field name for additional fields.




<a id="write_to_file#produces"></a>
### Produces


**This component does not produce data.**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct bit it might confuse users.

Maybe word it like: This component does not produce a dataframe for downstream processing.


<a id="write_to_file#arguments"></a>
## Arguments

The component takes the following arguments to alter its behavior:

| argument | type | description | default |
| -------- | ---- | ----------- | ------- |
| path | str | Path to store the dataset, whether it's a local path or a cloud storage bucket, must be specified. A separate filename will be generated for each partition. If you are using the local runner and export the data to a local directory, ensure that you mount the path to the directory using the `--extra-volumes` argument. | / |
| format | str | Format for storing the dataframe can be either `csv` or `parquet`. As default `parquet` is used. The CSV files contain the column as a header and use a comma as a delimiter. | parquet |

<a id="write_to_file#usage"></a>
## Usage

You can add this component to your pipeline using the following code:

```python
from fondant.pipeline import Pipeline


pipeline = Pipeline(...)

dataset = pipeline.read(...)

dataset = dataset.apply(...)

dataset.write(
"write_to_file",
arguments={
# Add arguments
# "path": ,
# "format": "parquet",
},
consumes={
<field_name>: <dataset_field_name>,
..., # Add fields
},
)
```

<a id="write_to_file#testing"></a>
## Testing

You can run the tests using docker with BuildKit. From this directory, run:
```
docker build . --target test
```
26 changes: 26 additions & 0 deletions components/write_to_file/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Write to file
description: >-
A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket.
The dataset can be written as csv or parquet.
image: 'fndnt/write_to_file:dev'
tags:
- Data writing

consumes:
additionalProperties: true

args:
path:
description: >-
Path to store the dataset, whether it's a local path or a cloud storage bucket,
must be specified. A separate filename will be generated for each partition.
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
If you are using the local runner and export the data to a local directory,
ensure that you mount the path to the directory using the `--extra-volumes` argument.
type: str
format:
description: >-
Format for storing the dataframe can be either `csv` or `parquet`. As default
`parquet` is used.
The CSV files contain the column as a header and use a comma as a delimiter.
type: str
default: parquet
Empty file.
29 changes: 29 additions & 0 deletions components/write_to_file/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import dask.dataframe as dd
from fondant.component import DaskWriteComponent


class WriteToFile(DaskWriteComponent):
def __init__(self, *, path: str, format: str):
"""Initialize the write to file component."""
self.path = path
self.format = format

def write(self, dataframe: dd.DataFrame) -> None:
"""
Writes the data from the given Dask DataFrame to a file either locally or
to a remote storage bucket.

Args:
dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written.
"""
if self.format.lower() == "csv":
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
self.path = self.path + "/export-*.csv"
dataframe.to_csv(self.path)
elif self.format.lower() == "parquet":
dataframe.to_parquet(self.path)
else:
msg = (
f"Not supported file format {self.format}. Writing to file is only "
f"supported for `csv` and `parquet`."
)
raise ValueError(msg)
2 changes: 2 additions & 0 deletions components/write_to_file/tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
pythonpath = ../src
1 change: 1 addition & 0 deletions components/write_to_file/tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest==7.4.2
58 changes: 58 additions & 0 deletions components/write_to_file/tests/write_to_file_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import tempfile

import dask.dataframe as dd
import pandas as pd

from src.main import WriteToFile


def test_write_to_csv():
"""Test case for write to file component."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = 10

dask_dataframe = dd.DataFrame.from_dict(
{
"text": [
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo",
]
* entries,
},
npartitions=1,
)

component = WriteToFile(
path=tmpdir,
format="csv",
)

component.write(dask_dataframe)

df = pd.read_csv(tmpdir + "/export-0.csv")
assert len(df) == entries


def test_write_to_parquet():
"""Test case for write to file component."""
with tempfile.TemporaryDirectory() as tmpdir:
entries = 10

dask_dataframe = dd.DataFrame.from_dict(
{
"text": [
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo",
]
* entries,
},
npartitions=1,
)

component = WriteToFile(
path=tmpdir,
format="parquet",
)

component.write(dask_dataframe)

ddf = dd.read_parquet(tmpdir)
assert len(ddf) == entries
5 changes: 4 additions & 1 deletion examples/sample_pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ returns the received dataframe.
The pipeline can be executed with the Fondant cli:

```bash
fondant run local pipeline.py
fondant run local pipeline.py --extra-volumes ./data:/data
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
```

> We are mounting the data folder using `--extra-volumes` to ensure that the component
> can load the sample data and write to the folder.

The automated integration test will use the `run.sh` script.
4 changes: 3 additions & 1 deletion examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
return dataframe


_ = dataset.apply(
dataset = dataset.apply(
ref=CalculateChunkLength,
produces={"chunk_length": pa.int32()},
arguments={"arg_x": "value_x"},
)

dataset.write(ref="write_to_file", arguments={"path": "/data/export"})
Loading