diff --git a/data_explorer/app/df_helpers/fields.py b/data_explorer/app/df_helpers/fields.py index 85ab1cb4a..6f5f68083 100644 --- a/data_explorer/app/df_helpers/fields.py +++ b/data_explorer/app/df_helpers/fields.py @@ -13,7 +13,7 @@ def get_fields_by_types( filtered_fields = [] for field, f_type in fields.items(): - if any(ftype in f_type.type.to_json()["type"] for ftype in field_types): + if any(ftype in f_type.type.to_dict()["type"] for ftype in field_types): filtered_fields.append(field) return filtered_fields diff --git a/data_explorer/app/pages/home.py b/data_explorer/app/pages/home.py index 55387ebd0..5b89fcbf3 100644 --- a/data_explorer/app/pages/home.py +++ b/data_explorer/app/pages/home.py @@ -52,7 +52,7 @@ def create_component_table(manifest: Manifest) -> str: component_name = manifest.component_id fields_with_schema = [ - (field_name, field_schema.type.to_json()["type"]) + (field_name, field_schema.type.to_dict()["type"]) for field_name, field_schema in fields.items() ] diff --git a/examples/sample_pipeline/data/sample.parquet b/examples/sample_pipeline/data/sample.parquet index 3b56f832b..4988861ae 100644 Binary files a/examples/sample_pipeline/data/sample.parquet and b/examples/sample_pipeline/data/sample.parquet differ diff --git a/examples/sample_pipeline/pipeline.py b/examples/sample_pipeline/pipeline.py index eff75db77..3c1ae60cc 100644 --- a/examples/sample_pipeline/pipeline.py +++ b/examples/sample_pipeline/pipeline.py @@ -26,7 +26,6 @@ arguments={ "dataset_uri": "/data/sample.parquet", "column_name_mapping": load_component_column_mapping, - "n_rows_to_load": 5, }, produces={"text_data": pa.string()}, ) diff --git a/src/fondant/core/component_spec.py b/src/fondant/core/component_spec.py index 64507ec81..43f76f122 100644 --- a/src/fondant/core/component_spec.py +++ b/src/fondant/core/component_spec.py @@ -94,8 +94,8 @@ def __init__( image: str, *, description: t.Optional[str] = None, - consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None, - produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None, + consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None, + produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType, bool]]] = None, previous_index: t.Optional[str] = None, args: t.Optional[t.Dict[str, t.Any]] = None, tags: t.Optional[t.List[str]] = None, @@ -223,7 +223,7 @@ def consumes(self) -> t.Mapping[str, Field]: """The fields consumed by the component as an immutable mapping.""" return types.MappingProxyType( { - name: Field(name=name, type=Type.from_json(field)) + name: Field(name=name, type=Type.from_dict(field)) for name, field in self._specification.get("consumes", {}).items() if name != "additionalProperties" }, @@ -234,7 +234,7 @@ def produces(self) -> t.Mapping[str, Field]: """The fields produced by the component as an immutable mapping.""" return types.MappingProxyType( { - name: Field(name=name, type=Type.from_json(field)) + name: Field(name=name, type=Type.from_dict(field)) for name, field in self._specification.get("produces", {}).items() if name != "additionalProperties" }, @@ -368,7 +368,7 @@ def __init__( self._inner_produces: t.Optional[t.Mapping[str, Field]] = None self._outer_produces: t.Optional[t.Mapping[str, Field]] = None - def to_json(self) -> str: + def to_dict(self) -> dict: def _dump_mapping( mapping: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]], ) -> dict: @@ -378,15 +378,17 @@ def _dump_mapping( serialized_mapping: t.Dict[str, t.Any] = mapping.copy() for key, value in mapping.items(): if isinstance(value, pa.DataType): - serialized_mapping[key] = Type(value).to_json() + serialized_mapping[key] = Type(value).to_dict() return serialized_mapping - specification_dict = { + return { "specification": self._component_spec.specification, "consumes": _dump_mapping(self._mappings["consumes"]), "produces": _dump_mapping(self._mappings["produces"]), } + def to_json(self) -> str: + specification_dict = self.to_dict() return json.dumps(specification_dict) @classmethod @@ -397,7 +399,7 @@ def _parse_mapping( """Parse a json mapping to a Python mapping with Fondant types.""" for key, value in json_mapping.items(): if isinstance(value, dict): - json_mapping[key] = Type.from_json(value).value + json_mapping[key] = Type.from_dict(value).value return json_mapping return cls( diff --git a/src/fondant/core/manifest.py b/src/fondant/core/manifest.py index a4cb0d6d8..15ecf87d7 100644 --- a/src/fondant/core/manifest.py +++ b/src/fondant/core/manifest.py @@ -188,7 +188,7 @@ def fields(self) -> t.Mapping[str, Field]: { name: Field( name=name, - type=Type.from_json(field), + type=Type.from_dict(field), location=field["location"], ) for name, field in self._specification["fields"].items() @@ -208,7 +208,7 @@ def add_or_update_field(self, field: Field, overwrite: bool = False): else: self._specification["fields"][field.name] = { "location": field.location, - **field.type.to_json(), + **field.type.to_dict(), } def _add_or_update_index(self, field: Field, overwrite: bool = True): diff --git a/src/fondant/core/schema.py b/src/fondant/core/schema.py index 62ca4f02c..953cd9167 100644 --- a/src/fondant/core/schema.py +++ b/src/fondant/core/schema.py @@ -136,7 +136,7 @@ def list(cls, data_type: t.Union[str, pa.DataType, "Type"]) -> "Type": ) @classmethod - def from_json(cls, json_schema: dict): + def from_dict(cls, json_schema: dict): """ Creates a new `Type` instance based on a dictionary representation of the json schema of a data type (https://swagger.io/docs/specification/data-models/data-types/). @@ -150,12 +150,12 @@ def from_json(cls, json_schema: dict): if json_schema["type"] == "array": items = json_schema["items"] if isinstance(items, dict): - return cls.list(cls.from_json(items)) + return cls.list(cls.from_dict(items)) return None return cls(json_schema["type"]) - def to_json(self) -> dict: + def to_dict(self) -> dict: """ Converts the `Type` instance to its JSON representation. @@ -165,7 +165,7 @@ def to_json(self) -> dict: if isinstance(self.value, pa.ListType): items = self.value.value_type if isinstance(items, pa.DataType): - return {"type": "array", "items": Type(items).to_json()} + return {"type": "array", "items": Type(items).to_dict()} type_ = None for type_name, data_type in _TYPES.items(): diff --git a/src/fondant/pipeline/lightweight_component.py b/src/fondant/pipeline/lightweight_component.py index d8280ba89..afeed47ee 100644 --- a/src/fondant/pipeline/lightweight_component.py +++ b/src/fondant/pipeline/lightweight_component.py @@ -2,7 +2,7 @@ import itertools import textwrap import typing as t -from dataclasses import dataclass +from dataclasses import asdict, dataclass from functools import wraps from fondant.component import BaseComponent, Component @@ -19,6 +19,9 @@ def __post_init__(self): # TODO: link to Fondant version self.base_image = "fondant:latest" + def to_dict(self): + return asdict(self) + class PythonComponent(BaseComponent): @classmethod diff --git a/src/fondant/pipeline/pipeline.py b/src/fondant/pipeline/pipeline.py index 18d16a111..0711c9f34 100644 --- a/src/fondant/pipeline/pipeline.py +++ b/src/fondant/pipeline/pipeline.py @@ -332,18 +332,23 @@ def get_nested_dict_hash(input_dict): hash_object = hashlib.md5(sorted_json_string.encode()) # nosec return hash_object.hexdigest() - component_spec_dict = self.component_spec.specification + operation_spec_dict = self.operation_spec.to_dict() + image_dict = self.image.to_dict() + arguments = ( get_nested_dict_hash(self.arguments) if self.arguments is not None else None ) component_op_uid_dict = { - "component_spec_hash": get_nested_dict_hash(component_spec_dict), + "operation_spec_hash": get_nested_dict_hash(operation_spec_dict), + "image": get_nested_dict_hash(image_dict), "arguments": arguments, "input_partition_rows": self.input_partition_rows, "number_of_accelerators": self.resources.accelerator_number, "accelerator_name": self.resources.accelerator_name, "node_pool_name": self.resources.node_pool_name, + "cluster_type": self.cluster_type, + "client_kwargs": self.client_kwargs, } if previous_component_cache is not None: diff --git a/tests/core/test_schema.py b/tests/core/test_schema.py index 08d9dc969..f442835cd 100644 --- a/tests/core/test_schema.py +++ b/tests/core/test_schema.py @@ -10,8 +10,8 @@ def test_valid_type(): assert Type("int8").value == pa.int8() assert Type.list(Type("int8")).value == pa.list_(pa.int8()) assert Type.list(Type.list(Type("string"))).value == pa.list_(pa.list_(pa.string())) - assert Type("int8").to_json() == {"type": "int8"} - assert Type.list("float32").to_json() == { + assert Type("int8").to_dict() == {"type": "int8"} + assert Type.list("float32").to_dict() == { "type": "array", "items": {"type": "float32"}, } @@ -19,11 +19,11 @@ def test_valid_type(): def test_valid_json_schema(): """Test that Type class initialized with a json schema matches the expected pyarrow schema.""" - assert Type.from_json({"type": "string"}).value == pa.string() - assert Type.from_json( + assert Type.from_dict({"type": "string"}).value == pa.string() + assert Type.from_dict( {"type": "array", "items": {"type": "int8"}}, ).value == pa.list_(pa.int8()) - assert Type.from_json( + assert Type.from_dict( {"type": "array", "items": {"type": "array", "items": {"type": "int8"}}}, ).value == pa.list_(pa.list_(pa.int8())) @@ -32,12 +32,12 @@ def test_valid_json_schema(): "statement", [ 'Type("invalid_type")', - 'Type("invalid_type").to_json()', + 'Type("invalid_type").to_dict()', 'Type.list(Type("invalid_type"))', - 'Type.list(Type("invalid_type")).to_json()', - 'Type.from_json({"type": "invalid_value"})', - 'Type.from_json({"type": "invalid_value", "items": {"type": "int8"}})', - 'Type.from_json({"type": "array", "items": {"type": "invalid_type"}})', + 'Type.list(Type("invalid_type")).to_dict()', + 'Type.from_dict({"type": "invalid_value"})', + 'Type.from_dict({"type": "invalid_value", "items": {"type": "int8"}})', + 'Type.from_dict({"type": "array", "items": {"type": "invalid_type"}})', ], ) def test_invalid_json_schema(statement): diff --git a/tests/pipeline/test_compiler.py b/tests/pipeline/test_compiler.py index 1774ba8d7..48d558ff1 100644 --- a/tests/pipeline/test_compiler.py +++ b/tests/pipeline/test_compiler.py @@ -540,7 +540,7 @@ def test_invalid_vertex_configuration(tmp_path_factory): def test_caching_dependency_docker(tmp_path_factory): - """Test that the component cache key changes when a depending component cache key change for + """Test that the component cache key changes when a dependant component cache key change for the docker compiler. """ arg_list = ["dummy_arg_1", "dummy_arg_2"] diff --git a/tests/pipeline/test_python_component.py b/tests/pipeline/test_python_component.py index fb15291c4..502ad71fe 100644 --- a/tests/pipeline/test_python_component.py +++ b/tests/pipeline/test_python_component.py @@ -78,8 +78,10 @@ def load(self) -> dd.DataFrame: ) assert len(pipeline._graph.keys()) == 1 - operation_spec = pipeline._graph["CreateData"]["operation"].operation_spec.to_json() - assert json.loads(operation_spec) == { + operation_spec_dict = pipeline._graph["CreateData"][ + "operation" + ].operation_spec.to_dict() + assert operation_spec_dict == { "specification": { "name": "CreateData", "image": "python:3.8-slim-buster", @@ -106,11 +108,10 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: consumes={"x": pa.int32(), "y": pa.int32()}, arguments={"n": 1}, ) - assert len(pipeline._graph.keys()) == 1 + 1 assert pipeline._graph["AddN"]["dependencies"] == ["CreateData"] - operation_spec = pipeline._graph["AddN"]["operation"].operation_spec.to_json() - assert json.loads(operation_spec) == { + operation_spec_dict = pipeline._graph["AddN"]["operation"].operation_spec.to_dict() + assert operation_spec_dict == { "specification": { "name": "AddN", "image": "fondant:latest",