Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lightweight docs #827

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 58 additions & 8 deletions docs/components/lightweight_components.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ from fondant.component import DaskLoadComponent, PandasTransformComponent
from fondant.pipeline import lightweight_component
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa

@lightweight_component
@lightweight_component(produces={"x": pa.int32(), "y": pa.int32()})
class CreateData(DaskLoadComponent):
def load(self) -> dd.DataFrame:
df = pd.DataFrame(
Expand All @@ -27,7 +28,7 @@ class CreateData(DaskLoadComponent):
)
return dd.from_pandas(df, npartitions=1)

@lightweight_component
@lightweight_component(produces={"z": pa.int32()})
class AddNumber(PandasTransformComponent):
def __init__(self, n: int):
self.n = n
Expand All @@ -45,7 +46,6 @@ To register those components to a pipeline, we can use the `read` and `apply` me
first and second component respectively:

```python title="pipeline.py"
import pyarrow as pa
from fondant.pipeline import Pipeline

pipeline = Pipeline(
Expand All @@ -55,12 +55,10 @@ pipeline = Pipeline(

dataset = pipeline.read(
ref=CreateData,
produces={"x": pa.int32(), "y": pa.int32()},
)

_ = dataset.apply(
ref=AddNumber,
produces={"z": pa.int32()},
arguments={"n": 1},
)
```
Expand Down Expand Up @@ -102,7 +100,11 @@ If we take the previous example, we can restrict the columns that are loaded by
by specifying the `x` column in the `consumes` argument:

```python title="pipeline.py"
@lightweight_component(consumes={"x"})
@lightweight_component(
consumes={
"x": pa.int32()
}
)
class AddNumber(PandasTransformComponent):
def __init__(self, n: int):
self.n = n
Expand All @@ -118,5 +120,53 @@ datasets and want to avoid loading unnecessary data.
If you want to publish your component to the Fondant Hub, you will need to convert
it to containerized component. See the [containerized component guide](../components/containerized_components.md) for more info.

**Note:** Python based components also support defining dynamic fields by default. See the [dynamic fields guide](../components/component_spec.md#dynamic-fields) for more info
on dynamic fields.
## Loading dynamic fields

You can also choose to load in dynamic fields by setting the `additionalProperties` argument to `True` in the `consumes` argument.

This will allow you to define an arbitrary number of columns to be loaded when applying your component to the pipeline.

This can be useful in scenarios when we want to dynamically load in fields from a dataset. For example, if we want to aggregate results
from multiple columns, we can define a component that loads in specific column from the previous component and then aggregates them.

Starting from the previous example where we now have a dataset with a `x`, `y` and `z` column, we can define a component that aggregates
the `x` and `z` columns into a new column `score`:

```python
import dask.dataframe as dd
from fondant.component import PandasTransformComponent
from fondant.pipeline import lightweight_component

@lightweight_component(
consumes={
"additionalProperties": True
},
produces={"score": pa.int32()},
)
class AggregateResults(PandasTransformComponent):
def __init__(self):
pass

def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
metrics = list(self.consumes.keys())
agg = dataframe[metrics].mean()
agg_df = agg.to_frame(name="score")

return agg_df

_ = dataset.apply(
ref=AggregateResults,
consumes={"x": pa.int32(), "z": pa.int32()},
)
```

This will aggregate the `x` and `z` columns into a new column `score`.

The main difference between the `consumes` argument in the `@lightweight_component` decorator and the `consumes` argument in the `apply` method is that the former is used to define the
schema of the component and the latter is used to specify the input data that will be passed to the component.

Specifying the `consumes` argument in the `apply`allows for more flexibility in defining the input schema of the component
compared to the `consumes` argument in the `@lightweight_component` decorator which is used to define the schema of the component.

Refer to this [section](../components/component_spec.md#dynamic-fields) for more info
on dynamic fields.
Loading