-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start from dataset schema for lightweight python component consumes
#789
Conversation
src/fondant/pipeline/pipeline.py
Outdated
consumes_spec = {k: v.type.to_json() for k, v in self.fields.items()} | ||
if consumes: | ||
for k, v in consumes.items(): | ||
consumes_spec[k] = consumes_spec[v] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs more checks and might have to be moved to a better place. Just wanted to get a PoC working.
would you then add a consumes section to the decorator? Is there a way to perhaps infer it directly from the component script? For example def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["a"] = dataframe["a"].map(lambda x: x + self.n)
dataframe["b"] = dataframe["b"].map(lambda x: x 0 self.n)
return dataframe We can parse the script text and infer than both fields |
That was my idea, yes. It should be optional though, keeping the standard case simple.
I don't think this is feasible. If you just look at our own components, it's clear that there's a huge number of cases to consider. Fields could be accessed in other functions, within |
If so, it could look like this for the component in the sample pipeline: @lightweight_component(consumes={"chunk": pa.string()})
class CalculateChunkLength(PandasTransformComponent):
def __init__(self, feature_name: str, **kwargs):
self.feature_name = feature_name
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe[self.feature_name] = dataframe["chunk"].apply(len)
return dataframe
_ = dataset.apply(
ref=CalculateChunkLength,
consumes={"chunk": "text"},
produces={"chunk_length": pa.int32()},
arguments={"feature_name": "chunk_length"},
) It might not have to contain the whole schema, since we can get that from the dataset schema. It could just be a list of the fields to consume: @lightweight_component(consumes=["chunk"])
class CalculateChunkLength(PandasTransformComponent):
def __init__(self, feature_name: str, **kwargs):
self.feature_name = feature_name
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe[self.feature_name] = dataframe["chunk"].apply(len)
return dataframe
_ = dataset.apply(
ref=CalculateChunkLength,
consumes={"chunk": "text"},
produces={"chunk_length": pa.int32()},
arguments={"feature_name": "chunk_length"},
) We would just lose the ability to validate the types between these components. Not sure what would be most straightforward for the user. |
The decorator feels like the right place for this. Thinking about the user journey here, people will probably start to implement PythonComponents and at some point try to share them. I think the decorator could be become the counterpart to the fondant_component.yaml. Maybe this allows to build a reusable component really easy. Basically, use the decorator information to generate a component spec, use the image as base image and the Python code as component main. This would help us as well to explain the concepts easily. Decorator is the counterpart to the component_spec, source code the counterpart to the component functionality. But I would keep this indeed optional. When people start implementing things it is hard to believe they will immediately test it with huge datasets. I guess we don't have to aim for an efficient execution in the developing phase when people using small datasets. Do you see any disadvantages by using the decorator for it?
_ = dataset.apply(
ref=CalculateChunkLength,
consumes={"chunk": "text"},
produces={"chunk_length": pa.int32()},
arguments={"feature_name": "chunk_length"},
) Not sure if this is needed. When people use the decorator as a component spec, they might start naming the consume and produce in the decorator itself that it fits to the available column names. I think the consumes and produces isn't needed in the |
If this were the case, I would rather keep it in the But I agree your other points:
|
a2be107
to
0d2ba12
Compare
0d2ba12
to
d898e4a
Compare
Updated the PR based on the discussions above.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli. Left a few small comments. Still want to test the changes.
} | ||
|
||
@lightweight_component() | ||
@lightweight_component(consumes="generic") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to have a empty consumes instead of passing "generic" here.
For me it would be fine to consume the whole dataset. It would make the usability less complex and reduce the efficiency of the component execution. We should keep the base interface as simple as possible. Pipeline improvements will probably following later during the development cycle.
I think we don't want to use the term "generic component". What is the issue when we pass None
instead of the string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue is there are still 3 general options that should be supported without the possibility to mix some of them together.
- consumes is
None
-> non-generic component without specified fields -> consume all the fields in the dataset schema. Does not have an equivalence in the component spec definition of things. - consumes is specified as a
list
-> non-generic component with specified fields -> start from the fields in the dataset schema and filter based on the fields specified in the list. Equivalent to having specific fields in the component spec. - consumes ==
generic
-> generic component that allows to define dynamic fields in theapply
produces/consumes -> the fields to consumes are the ones specified in theapply
section. Equivalent to settingadditionalProperties
totrue
in the component spec.
The only solution would be to somehow mix the 1st and 3rd option but this would require us to change the component spec to support both dynamic and specified fields which is not something we currently support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrchtr updated based on the feedback, consumes is None
now simply means to consume all fields from previous components. We lose the concept of additionalProperties
for the consumes
section in python based components. However I don't think it would be required since we're inferring the fields based on the dataset schema
This is more evident here
@lightweight_component(
base_image="python:3.8-slim-buster",
extra_requires=["pandas", "dask"],
)
class CreateData(DaskLoadComponent):
def load(self) -> dd.DataFrame:
....
dataset = pipeline.read(
ref=CreateData,
produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
)
# dataset schema has x,y,z
@lightweight_component
class AddN(PandasTransformComponent):
def __init__(self, n: int, **kwargs):
self.n = n
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["x"] = dataframe["x"].map(lambda x: x + self.n)
return dataframe
_ = dataset.apply(
ref=AddN,
produces={"x": pa.int32(), "y": pa.int32(), "z": pa.int32()},
consumes=None, # This now has to be defined as None since we can't define dynamic fields but we can already infer the schema based on the dataset
arguments={"n": 1},
)
I think both options are valuable with small tradeoffs.
- The previous one would not require any changes between the
consumes
for the apply function forconsumes
but slightly more complex interface for the lightweight component - This approach offers more flexibility in the lightweight component with more flexibility, but slightly different way of defining consumes compared to resuable components
Happy to hear other takes on this @RobbeSneyders @GeorgesLorre
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with not supporting consumes=generic
. The additionalProperties: true
is only needed because the user cannot easily change the schema of a reusable component. But they can easily change the schema of a lightweight one. So there's no need for it here.
Lightweight Python components can still be implemented in a generic way without it. It just means that the implementation of the component depends on the consumes
argument it receives.
95a5a72
to
0619c41
Compare
0619c41
to
4c97282
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli!
Left some comments on the structure of the code.
@@ -110,6 +126,41 @@ def validate_abstract_methods_are_implemented(cls): | |||
msg, | |||
) | |||
|
|||
def modify_consumes_spec(apply_consumes, consumes_spec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this functionality already available in the OperationSpec
class? I would try to keep the implementation in line with the custom components:
- Build the consumes specification based on the decorator
consumes
- Leverage both the consumes specification and consumes apply argument in the
OperationSpec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the flow currently goes like this
-> get consumes_spec
(remap names if necessary and filter) -> pass it to the consumes
section of the component spec (actual consumed fields) -> build the OperationOp
The reason why we're modifying things in the decorator is that the component spec should already have the mapped name, in resuable components this is usually the case but here it's a bit different.
Let's take an example where we have a dataset of field x
and the next component accepts a field a
The component spec for the reusable component would already have the field a
in it's schema. For lightweight components, we start with the dataset schema (x
) that needs to be remapped to a
before being passed to the component spec and then the componentOp.
To replicate the issue you can comment out the function modify_consumes_spec
and run this test. By commenting out the mapping and filter function
@classmethod
def get_consumes_spec(
cls,
dataset_fields: t.Mapping[str, Field],
apply_consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]],
):
python_component_consumes = cls.consumes()
# Get consumes spec from the dataset
consumes_spec = {k: v.type.to_dict() for k, v in dataset_fields.items()}
# Modify naming based on the consumes argument in the 'apply' method
#\consumes_spec = cls.modify_consumes_spec(apply_consumes, consumes_spec)
# Filter for values that are not in the user defined consumes list
# consumes_spec = cls.filter_consumes_spec(
# python_component_consumes,
# consumes_spec,
# )
return consumes_spec
E fondant.core.exceptions.InvalidPipelineDefinition: Received a string value for key `a` in the `consumes` argument passed to the operation, but `a` is not defined in the `consumes` section of the component spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow I suggest is the following:
# First build the equivalent of the component spec
consumes_spec = create_consumes_spec(dataset_fields, python_component_consumes)
component_spec = ComponentSpec(consumes_spec, ...)
# Then apply the same logic as for containerized components
operation_spec = OperationSpec(component_spec, apply_consumes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the issue will still persist:
- The component spec should always contain the names of the columns that will be consumed by the component
- This was not an issue before since we have it explicitly written in a yaml
- Now we are starting from the dataset schema/fields which may have different names than the ones expected by the component, thus we need to dynamically infer the component spec.
- The
dataset_fields
andpython_component_consumes
which just contains a list of columns to consume alone are not enough to get the ground truth component spec, for that we need to remap it using theapply
consumes before passing it to theOperationSpec
in order to get the to the actual component spec expected by the component.
For example, starting from a dataset_fields of x,y and assuming python_component_consumes
is [x
], we cannot infer the component spec for the next component which may expect z
-> leads to errors.
I think that's the main reason why we need to include some custom logic beforehand
@@ -121,6 +172,32 @@ class PythonComponentOp(cls, PythonComponent): | |||
def image(cls) -> Image: | |||
return image | |||
|
|||
@classmethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make this a class property by combining the classmethod
and property
decorators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be reversed, but seems like it only works for Python 3.9 and 3.10 (docs).
Just making it a class attribute could work as well:
class BaseClass:
consumes: ConsumesType
class Class(BaseClass):
consumes=consumes_ # cannot be the same name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I still don't quite follow what should be done here, aren't the class methods needed for the decorators? what's the need for attributes in this example?
def from_ref( | ||
cls, | ||
ref: t.Any, | ||
fields: t.Optional[t.Mapping[str, Field]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to keep this fields argument out of here since this is specific to the lightweight Python components. Can we move this to the PythonComponent
class instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a straightforwards way of doing this, unless we somehow pass the fields to the BaseComponent
class since this is what the PythonComponent
starts from but also not sure how feasible that is. Any other suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original implementation did this in the Dataset
class. So I would assume we can just call a method on the PythonComponent
class at that point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this might make less sense after the refactoring on main since my first commits. If we can address my comment above, it's fine for me to keep it like this for now. Would be good to add the argument to the docstring though.
f16dc2a
to
3943c4b
Compare
Co-authored-by: Robbe Sneyders <[email protected]>
1def2c3
to
d8e5563
Compare
} | ||
pipeline._validate_pipeline_definition(run_id="dummy-run-id") | ||
|
||
DockerCompiler().compile(pipeline) | ||
|
||
|
||
def test_consumes_mapping_all_fields(tmp_path_factory, load_pipeline): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RobbeSneyders this test is still failing. We are facing the same issue as discussed before when loading all the fields (consumed spec needs to be remapped based on the apply consumes before passing to the componentOp). Any other way of tackling this?
The rest of the cases (generic and specific consumes below) seem to work well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more I think about it, the more I feel like it makes less sense to just pass in the default dataset schema. It doesn't seem to fit in nicely with the current paradigm we have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see two options:
- We accept the current behavior: you can't remap field names in the apply
consumes
if the component doesn't explicitly defineconsumes
in the decorator. - As you mention, include the remapping when calculating the default
consumes
. I think this is less of an issue than before, since this is no longer happening if theconsumes
is specified explicitly in the decorator.
Any other ideas?
If not, I would probably vote for the second one mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, the second option seems to be the most logical for now, will update it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli.
Looks good to me, but I can't approve since this was originally my PR 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli. Found some minor things.
pass | ||
|
||
@classmethod | ||
def modify_consumes_spec(cls, apply_consumes, consumes_spec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: Can we add typing here too.
return consumes_spec | ||
|
||
@classmethod | ||
def get_consumes_spec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a docstring would be good.
# Get consumes spec from the dataset | ||
spec_consumes = {k: v.type.to_dict() for k, v in dataset_fields.items()} | ||
|
||
spec_consumes = cls.modify_consumes_spec(apply_consumes, spec_consumes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: I would use consistent naming, so either consumes_spec
or spec_consumes
.
Maybe we can use inside the modify_consumes_spec
something like dataset_consumes
to make even more clear.
else: | ||
msg = ( | ||
f"Invalid data type for field `{k}` in the `apply_consumes` " | ||
f"argument. Only string and pa.DataType are allowed." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I see it correctly, we are not checking if the type is pa.DataType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch only strings are allowed in case no consumes
is passed on the decorator level
a904fe5
to
5b69298
Compare
Update lightweight docs based on #789
Fixes #785
Opening this as a draft PR since it's not yet clear to me what the desired behavior is.
I'll be using the "inner" / "outer" terminology which we already use in our
OperationSpec
class to explain. "inner" schema's are the schema's that the Python component consumes / produces. "outer" schema's are the schema's that theDataIO
layer consumes / produces.For docker components, the logic works as follows:
consumes
section in the component spec is the "inner" schemaconsumes
argument of theapply
method to calculate the "outer" schema from the "inner" schema.For lightweight python components, we do not have a component spec to start from. So what I currently implemented is this:
consumes
argument to calculate the "inner" schema.consumes
argument of theapply
method to calculate the "outer" schema from the "inner" schema.This works, but has one big downside. Since we start from the dataset schema, the calculated "inner" / "outer" consumes contain all the fields in the dataset. In other words, the lack of a component spec removes the ability to select which columns from the dataset to load. Since this is an important part of our optimization, I think we need to find a way around this.
My best idea at this time is to expand the
lightweight_component
decorator to add support for this. But curious to hear if anyone has other ideas.