-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create simple storage writer #826
Changes from 16 commits
dfafd91
bb8702a
870937b
8eca440
5269fb5
690c6ac
c402d46
c058164
270e01f
ac2ea83
c15c6b6
c2caa3f
f3dfb89
db49966
848fb7f
2b2e196
5f2d741
c4ee2ff
d4902b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
FROM --platform=linux/amd64 python:3.8-slim as base | ||
|
||
# System dependencies | ||
RUN apt-get update && \ | ||
apt-get upgrade -y && \ | ||
apt-get install git -y | ||
|
||
# Install requirements | ||
COPY requirements.txt / | ||
RUN pip3 install --no-cache-dir -r requirements.txt | ||
|
||
# Install Fondant | ||
# This is split from other requirements to leverage caching | ||
ARG FONDANT_VERSION=main | ||
RUN pip3 install fondant[component,aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} | ||
|
||
# Set the working directory to the component folder | ||
WORKDIR /component | ||
COPY src/ src/ | ||
|
||
FROM base as test | ||
COPY tests/ tests/ | ||
RUN pip3 install --no-cache-dir -r tests/requirements.txt | ||
RUN python -m pytest tests | ||
|
||
FROM base | ||
WORKDIR /component/src | ||
ENTRYPOINT ["fondant", "execute", "main"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# Write to file | ||
|
||
<a id="write_to_file#description"></a> | ||
## Description | ||
A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket. The dataset can be written as csv or parquet. | ||
|
||
<a id="write_to_file#inputs_outputs"></a> | ||
## Inputs / outputs | ||
|
||
<a id="write_to_file#consumes"></a> | ||
### Consumes | ||
|
||
**This component can consume additional fields** | ||
- <field_name>: <dataset_field_name> | ||
This defines a mapping to update the fields consumed by the operation as defined in the component spec. | ||
The keys are the names of the fields to be received by the component, while the values are | ||
the name of the field to map from the input dataset | ||
|
||
See the usage example below on how to define a field name for additional fields. | ||
|
||
|
||
|
||
|
||
<a id="write_to_file#produces"></a> | ||
### Produces | ||
|
||
|
||
**This component does not produce data.** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct bit it might confuse users. Maybe word it like: |
||
|
||
<a id="write_to_file#arguments"></a> | ||
## Arguments | ||
|
||
The component takes the following arguments to alter its behavior: | ||
|
||
| argument | type | description | default | | ||
| -------- | ---- | ----------- | ------- | | ||
| path | str | Path to store the dataset, whether it's a local path or a cloud storage bucket, must be specified. A separate filename will be generated for each partition. If you are using the local runner and export the data to a local directory, ensure that you mount the path to the directory using the `--extra-volumes` argument. | / | | ||
| format | str | Format for storing the dataframe can be either `csv` or `parquet`. | csv | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the CSV write headers ? Does it use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CSV is written with headers and comma separated. I'll add to the description. |
||
|
||
<a id="write_to_file#usage"></a> | ||
## Usage | ||
|
||
You can add this component to your pipeline using the following code: | ||
|
||
```python | ||
from fondant.pipeline import Pipeline | ||
|
||
|
||
pipeline = Pipeline(...) | ||
|
||
dataset = pipeline.read(...) | ||
|
||
dataset = dataset.apply(...) | ||
|
||
dataset.write( | ||
"write_to_file", | ||
arguments={ | ||
# Add arguments | ||
# "path": , | ||
# "format": "csv", | ||
}, | ||
consumes={ | ||
<field_name>: <dataset_field_name>, | ||
..., # Add fields | ||
}, | ||
) | ||
``` | ||
|
||
<a id="write_to_file#testing"></a> | ||
## Testing | ||
|
||
You can run the tests using docker with BuildKit. From this directory, run: | ||
``` | ||
docker build . --target test | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
name: Write to file | ||
description: >- | ||
A Fondant component to write a dataset to file on a local machine or to a cloud storage bucket. | ||
The dataset can be written as csv or parquet. | ||
image: 'fndnt/write_to_file:dev' | ||
tags: | ||
- Data writing | ||
|
||
consumes: | ||
additionalProperties: true | ||
|
||
args: | ||
path: | ||
description: >- | ||
Path to store the dataset, whether it's a local path or a cloud storage bucket, | ||
must be specified. A separate filename will be generated for each partition. | ||
mrchtr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
If you are using the local runner and export the data to a local directory, | ||
ensure that you mount the path to the directory using the `--extra-volumes` argument. | ||
type: str | ||
format: | ||
description: Format for storing the dataframe can be either `csv` or `parquet`. | ||
type: str | ||
default: csv | ||
mrchtr marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import dask.dataframe as dd | ||
from fondant.component import DaskWriteComponent | ||
|
||
|
||
class WriteToFile(DaskWriteComponent): | ||
def __init__(self, *, path: str, format: str): | ||
"""Initialize the write to file component.""" | ||
self.path = path | ||
self.format = format | ||
|
||
def write(self, dataframe: dd.DataFrame) -> None: | ||
""" | ||
Writes the data from the given Dask DataFrame to a file either locally or | ||
to a remote storage bucket. | ||
|
||
Args: | ||
dataframe (dd.DataFrame): The Dask DataFrame containing the data to be written. | ||
""" | ||
if self.format.lower() == "csv": | ||
mrchtr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.path = self.path + "/export-*.csv" | ||
dataframe.to_csv(self.path) | ||
elif self.format.lower() == "parquet": | ||
dataframe.to_parquet(self.path) | ||
else: | ||
msg = ( | ||
f"Not supported file format {self.format}. Writing to file is only " | ||
f"supported for `csv` and `parquet`." | ||
) | ||
raise ValueError(msg) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[pytest] | ||
pythonpath = ../src |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pytest==7.4.2 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import tempfile | ||
|
||
import dask.dataframe as dd | ||
import pandas as pd | ||
|
||
from src.main import WriteToFile | ||
|
||
|
||
def test_write_to_csv(): | ||
"""Test case for write to file component.""" | ||
with tempfile.TemporaryDirectory() as tmpdir: | ||
entries = 10 | ||
|
||
dask_dataframe = dd.DataFrame.from_dict( | ||
{ | ||
"text": [ | ||
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo", | ||
] | ||
* entries, | ||
}, | ||
npartitions=1, | ||
) | ||
|
||
component = WriteToFile( | ||
path=tmpdir, | ||
format="csv", | ||
) | ||
|
||
component.write(dask_dataframe) | ||
|
||
df = pd.read_csv(tmpdir + "/export-0.csv") | ||
assert len(df) == entries | ||
|
||
|
||
def test_write_to_parquet(): | ||
"""Test case for write to file component.""" | ||
with tempfile.TemporaryDirectory() as tmpdir: | ||
entries = 10 | ||
|
||
dask_dataframe = dd.DataFrame.from_dict( | ||
{ | ||
"text": [ | ||
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo", | ||
] | ||
* entries, | ||
}, | ||
npartitions=1, | ||
) | ||
|
||
component = WriteToFile( | ||
path=tmpdir, | ||
format="parquet", | ||
) | ||
|
||
component.write(dask_dataframe) | ||
|
||
ddf = dd.read_parquet(tmpdir) | ||
assert len(ddf) == entries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to start using newer python versions since 3.8 will reach
end-of-life
before you know it. (https://devguide.python.org/versions/)