From 318886adbfd170c2672da330348a3cdf58f2a00a Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 30 Jan 2024 14:24:52 +0100 Subject: [PATCH 1/2] update lightweight docs --- docs/components/lightweight_components.md | 57 +++++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/docs/components/lightweight_components.md b/docs/components/lightweight_components.md index b1bc9a79..3da1ace8 100644 --- a/docs/components/lightweight_components.md +++ b/docs/components/lightweight_components.md @@ -102,7 +102,11 @@ If we take the previous example, we can restrict the columns that are loaded by by specifying the `x` column in the `consumes` argument: ```python title="pipeline.py" -@lightweight_component(consumes={"x"}) +@lightweight_component( + consumes={ + "x": pa.int32() + } +) class AddNumber(PandasTransformComponent): def __init__(self, n: int): self.n = n @@ -118,5 +122,52 @@ datasets and want to avoid loading unnecessary data. If you want to publish your component to the Fondant Hub, you will need to convert it to containerized component. See the [containerized component guide](../components/containerized_components.md) for more info. -**Note:** Python based components also support defining dynamic fields by default. See the [dynamic fields guide](../components/component_spec.md#dynamic-fields) for more info -on dynamic fields. +## Loading dynamic fields + +You can also choose to load in dynamic fields by setting the `additionalProperties` argument to `True` in the `consumes` argument. + +This will allow you to define an arbitrary number of columns to be loaded when applying your component to the pipeline. + +This can be useful in scenarios when we want to dynamically load in fields from a dataset. For example, if we want to aggregate results +from multiple columns, we can define a component that loads in specific column from the previous component and then aggregates them. + +Starting from the previous example where we now have a dataset with a `x`, `y` and `z` column, we can define a component that aggregates +the `x` and `z` columns into a new column `score`: + +```python +import dask.dataframe as dd +from fondant.component import PandasTransformComponent + +@lightweight_component( + consumes={ + "additionalProperties": True + } +) +class AggregateResults(PandasTransformComponent): + def __init__(self): + pass + + def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame: + metrics = list(self.consumes.keys()) + agg = dataframe[metrics].mean() + agg_df = agg.to_frame(name="score") + + return agg_df + +_ = dataset.apply( + ref=AggregateResults, + consumes={"x": pa.int32(), "z": pa.int32()}, + produces={"score": pa.int32()}, +) +``` + +This will aggregate the `x` and `z` columns into a new column `score`. + +The main difference between the `consumes` argument in the `@lightweight_component` decorator and the `consumes` argument in the `apply` method is that the former is used to define the +schema of the component and the latter is used to specify the input data that will be passed to the component. + +Specifying the `consumes` argument in the `apply`allows for more flexibility in defining the input schema of the component +compared to the `consumes` argument in the `@lightweight_component` decorator which is used to define the schema of the component. + +Refer to this [section](../components/component_spec.md#dynamic-fields) for more info +on dynamic fields. \ No newline at end of file From cbb387089aba596f9e1a316c3cc254c8ed577253 Mon Sep 17 00:00:00 2001 From: Philippe Moussalli Date: Tue, 30 Jan 2024 17:11:44 +0100 Subject: [PATCH 2/2] update produces in decorator --- docs/components/lightweight_components.md | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/components/lightweight_components.md b/docs/components/lightweight_components.md index 3da1ace8..67e6bb86 100644 --- a/docs/components/lightweight_components.md +++ b/docs/components/lightweight_components.md @@ -14,8 +14,9 @@ from fondant.component import DaskLoadComponent, PandasTransformComponent from fondant.pipeline import lightweight_component import dask.dataframe as dd import pandas as pd +import pyarrow as pa -@lightweight_component +@lightweight_component(produces={"x": pa.int32(), "y": pa.int32()}) class CreateData(DaskLoadComponent): def load(self) -> dd.DataFrame: df = pd.DataFrame( @@ -27,7 +28,7 @@ class CreateData(DaskLoadComponent): ) return dd.from_pandas(df, npartitions=1) -@lightweight_component +@lightweight_component(produces={"z": pa.int32()}) class AddNumber(PandasTransformComponent): def __init__(self, n: int): self.n = n @@ -45,7 +46,6 @@ To register those components to a pipeline, we can use the `read` and `apply` me first and second component respectively: ```python title="pipeline.py" -import pyarrow as pa from fondant.pipeline import Pipeline pipeline = Pipeline( @@ -55,12 +55,10 @@ pipeline = Pipeline( dataset = pipeline.read( ref=CreateData, - produces={"x": pa.int32(), "y": pa.int32()}, ) _ = dataset.apply( ref=AddNumber, - produces={"z": pa.int32()}, arguments={"n": 1}, ) ``` @@ -137,11 +135,13 @@ the `x` and `z` columns into a new column `score`: ```python import dask.dataframe as dd from fondant.component import PandasTransformComponent +from fondant.pipeline import lightweight_component @lightweight_component( consumes={ "additionalProperties": True - } + }, + produces={"score": pa.int32()}, ) class AggregateResults(PandasTransformComponent): def __init__(self): @@ -157,7 +157,6 @@ class AggregateResults(PandasTransformComponent): _ = dataset.apply( ref=AggregateResults, consumes={"x": pa.int32(), "z": pa.int32()}, - produces={"score": pa.int32()}, ) ```