Skip to content

Commit

Permalink
Update core package (#653)
Browse files Browse the repository at this point in the history
First PR related to the data structure redesign. 

Implements the following: 
- New manifest structure (including validation, and evolution)
- New ComponentSpec structure (including validation)
- Removes `Subsets` and `Index`

Not all tests are running successfully. But this are already quite a few
changes. Therefore, I've created PR on feature branch
`feature/redesign-dataset-format-and-interface`, to have quicker
feedback loops.

---------

Co-authored-by: Robbe Sneyders <[email protected]>
Co-authored-by: Philippe Moussalli <[email protected]>
  • Loading branch information
3 people authored Nov 23, 2023
1 parent 5dd2006 commit 06186c1
Show file tree
Hide file tree
Showing 90 changed files with 745 additions and 1,116 deletions.
58 changes: 8 additions & 50 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,6 @@ def kubeflow_type(self) -> str:
return lookup[self.type]


class ComponentSubset:
"""
Class representing a Fondant Component subset.
Args:
specification: the part of the component json representing the subset
"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = specification

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"

@property
def fields(self) -> t.Mapping[str, Field]:
return types.MappingProxyType(
{
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification["fields"].items()
},
)

@property
def additional_fields(self) -> bool:
return self._specification.get("additionalFields", True)


class ComponentSpec:
"""
Class representing a Fondant component specification.
Expand Down Expand Up @@ -190,39 +162,25 @@ def tags(self) -> t.List[str]:
return self._specification.get("tags", None)

@property
def index(self):
return ComponentSubset({"fields": {}})

@property
def consumes(self) -> t.Mapping[str, ComponentSubset]:
"""The subsets consumed by the component as an immutable mapping."""
def consumes(self) -> t.Mapping[str, Field]:
"""The fields consumed by the component as an immutable mapping."""
return types.MappingProxyType(
{
name: ComponentSubset(subset)
for name, subset in self._specification.get("consumes", {}).items()
if name != "additionalSubsets"
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification.get("consumes", {}).items()
},
)

@property
def produces(self) -> t.Mapping[str, ComponentSubset]:
"""The subsets produced by the component as an immutable mapping."""
def produces(self) -> t.Mapping[str, Field]:
"""The fields produced by the component as an immutable mapping."""
return types.MappingProxyType(
{
name: ComponentSubset(subset)
for name, subset in self._specification.get("produces", {}).items()
if name != "additionalSubsets"
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification.get("produces", {}).items()
},
)

@property
def accepts_additional_subsets(self) -> bool:
return self._specification.get("consumes", {}).get("additionalSubsets", True)

@property
def outputs_additional_subsets(self) -> bool:
return self._specification.get("produces", {}).get("additionalSubsets", True)

@property
def args(self) -> t.Mapping[str, Argument]:
args = self.default_arguments
Expand Down
239 changes: 103 additions & 136 deletions src/fondant/core/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pkgutil
import types
import typing as t
from collections import OrderedDict
from dataclasses import asdict, dataclass
from pathlib import Path

Expand All @@ -18,59 +19,6 @@
from fondant.core.schema import Field, Type


class Subset:
"""
Class representing a Fondant subset.
Args:
specification: The part of the manifest json representing the subset
base_path: The base path which the subset location is defined relative to
"""

def __init__(self, specification: dict, *, base_path: str) -> None:
self._specification = specification
self._base_path = base_path

@property
def location(self) -> str:
"""The absolute location of the subset."""
return self._base_path + self._specification["location"]

@property
def fields(self) -> t.Mapping[str, Field]:
"""The fields of the subset returned as an immutable mapping."""
return types.MappingProxyType(
{
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification["fields"].items()
},
)

def add_field(self, name: str, type_: Type, *, overwrite: bool = False) -> None:
if not overwrite and name in self._specification["fields"]:
msg = f"A field with name {name} already exists"
raise ValueError(msg)

self._specification["fields"][name] = type_.to_json()

def remove_field(self, name: str) -> None:
del self._specification["fields"][name]

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"


class Index(Subset):
"""Special case of a subset for the index, which has fixed fields."""

@property
def fields(self) -> t.Dict[str, Field]:
return {
"id": Field(name="id", type=Type("string")),
"source": Field(name="source", type=Type("string")),
}


@dataclass
class Metadata:
"""
Expand Down Expand Up @@ -171,8 +119,8 @@ def create(

specification = {
"metadata": metadata.to_dict(),
"index": {"location": f"/{pipeline_name}/{run_id}/{component_id}/index"},
"subsets": {},
"index": {"location": f"/{component_id}"},
"fields": {},
}
return cls(specification)

Expand All @@ -196,13 +144,55 @@ def copy(self) -> "Manifest":
def metadata(self) -> t.Dict[str, t.Any]:
return self._specification["metadata"]

@property
def index(self) -> Field:
return Field(name="Index", location=self._specification["index"]["location"])

def update_metadata(self, key: str, value: t.Any) -> None:
self.metadata[key] = value

@property
def base_path(self) -> str:
return self.metadata["base_path"]

@property
def field_mapping(self) -> t.Mapping[str, t.List[str]]:
"""
Retrieve a mapping of field locations to corresponding field names.
A dictionary where keys are field locations and values are lists
of column names.
The method returns an immutable OrderedDict where the first dict element contains the
location of the dataframe with the index. This allows an efficient left join operation.
Example:
{
"/base_path/component_1": ["Name", "HP"],
"/base_path/component_2": ["Type 1", "Type 2"],
}
"""
field_mapping = {}
for field_name, field in {"id": self.index, **self.fields}.items():
location = (
f"{self.base_path}/{self.pipeline_name}/{self.run_id}{field.location}"
)
if location in field_mapping:
field_mapping[location].append(field_name)
else:
field_mapping[location] = [field_name]

# Sort field mapping that the first dataset contains the index
sorted_keys = sorted(
field_mapping.keys(),
key=lambda key: "id" in field_mapping[key],
reverse=True,
)
sorted_field_mapping = OrderedDict(
(key, field_mapping[key]) for key in sorted_keys
)

return types.MappingProxyType(sorted_field_mapping)

@property
def run_id(self) -> str:
return self.metadata["run_id"]
Expand All @@ -220,39 +210,61 @@ def cache_key(self) -> str:
return self.metadata["cache_key"]

@property
def index(self) -> Index:
return Index(self._specification["index"], base_path=self.base_path)

@property
def subsets(self) -> t.Mapping[str, Subset]:
"""The subsets of the manifest as an immutable mapping."""
def fields(self) -> t.Mapping[str, Field]:
"""The fields of the manifest as an immutable mapping."""
return types.MappingProxyType(
{
name: Subset(subset, base_path=self.base_path)
for name, subset in self._specification["subsets"].items()
name: Field(
name=name,
type=Type(field["type"]),
location=field["location"],
)
for name, field in self._specification["fields"].items()
},
)

def add_subset(
self,
name: str,
fields: t.Iterable[t.Union[Field, t.Tuple[str, Type]]],
) -> None:
if name in self._specification["subsets"]:
msg = f"A subset with name {name} already exists"
def add_or_update_field(self, field: Field, overwrite: bool = False):
"""Add or update field to manifest."""
if field.name == "index":
self._add_or_update_index(field, overwrite=True)
elif overwrite is False and field.name in self._specification["fields"]:
msg = (
f"A field with name {field.name} already exists. Set overwrite to true, "
f"if you want to update the field."
)
raise ValueError(msg)
else:
self._specification["fields"][field.name] = {
"location": f"/{self.component_id}",
**field.type.to_json(),
}

def _add_or_update_index(self, field: Field, overwrite: bool = True):
"""Add or update the manifest index."""
if overwrite is False:
msg = (
"The index already exists. Set overwrite to true, "
"if you want to update the index."
)
raise ValueError(msg)

if field.name != "index":
msg = (
f"The field name is {field.name}. If you try to update the index, set the field"
f"name to `index`."
)
raise ValueError(msg)

self._specification["subsets"][name] = {
"location": f"/{self.pipeline_name}/{self.run_id}/{self.component_id}/{name}",
"fields": {name: type_.to_json() for name, type_ in fields},
self._specification["index"] = {
"location": f"/{field.location}",
}

def remove_subset(self, name: str) -> None:
if name not in self._specification["subsets"]:
msg = f"Subset {name} not found in specification"
def remove_field(self, name: str) -> None:
if name not in self._specification["fields"]:
msg = f"Field {name} not found in specification"
raise ValueError(msg)

del self._specification["subsets"][name]
del self._specification["fields"][name]

def evolve( # noqa : PLR0912 (too many branches)
self,
Expand All @@ -274,68 +286,23 @@ def evolve( # noqa : PLR0912 (too many branches)
# Update `component_id` of the metadata
component_id = component_spec.component_folder_name
evolved_manifest.update_metadata(key="component_id", value=component_id)

if run_id is not None:
evolved_manifest.update_metadata(key="run_id", value=run_id)

# Update index location as this is currently always rewritten
evolved_manifest.index._specification[
"location"
] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}/index"

# If additionalSubsets is False in consumes,
# Remove all subsets from the manifest that are not listed
if not component_spec.accepts_additional_subsets:
for subset_name in evolved_manifest.subsets:
if subset_name not in component_spec.consumes:
evolved_manifest.remove_subset(subset_name)

# If additionalSubsets is False in produces,
# Remove all subsets from the manifest that are not listed
if not component_spec.outputs_additional_subsets:
for subset_name in evolved_manifest.subsets:
if subset_name not in component_spec.produces:
evolved_manifest.remove_subset(subset_name)

# If additionalFields is False for a consumed subset,
# Remove all fields from that subset that are not listed
for subset_name, subset in component_spec.consumes.items():
if subset_name in evolved_manifest.subsets and not subset.additional_fields:
for field_name in evolved_manifest.subsets[subset_name].fields:
if field_name not in subset.fields:
evolved_manifest.subsets[subset_name].remove_field(
field_name,
)

# For each output subset defined in the component, add or update it
for subset_name, subset in component_spec.produces.items():
# Subset is already in manifest, update it
if subset_name in evolved_manifest.subsets:
# If additional fields are not allowed, remove the fields not defined in the
# component spec produces section
if not subset.additional_fields:
for field_name in evolved_manifest.subsets[subset_name].fields:
if field_name not in subset.fields:
evolved_manifest.subsets[subset_name].remove_field(
field_name,
)

# Add fields defined in the component spec produces section
# Overwrite to persist changes to the field (eg. type of column)
for field in subset.fields.values():
evolved_manifest.subsets[subset_name].add_field(
field.name,
field.type,
overwrite=True,
)

# Update subset location as this is currently always rewritten
evolved_manifest.subsets[subset_name]._specification[
"location"
] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}/{subset_name}"

# Subset is not yet in manifest, add it
else:
evolved_manifest.add_subset(subset_name, subset.fields.values())
# Update index location as this is always rewritten
evolved_manifest.add_or_update_field(
Field(name="index", location=component_spec.component_folder_name),
)

# TODO handle additionalFields

# Add or update all produced fields defined in the component spec
for name, field in component_spec.produces.items():
# If field was not part of the input manifest, add field to output manifest.
# If field was part of the input manifest and got produced by the component, update
# the manifest field.
evolved_manifest.add_or_update_field(field, overwrite=True)

return evolved_manifest

Expand Down
Loading

0 comments on commit 06186c1

Please sign in to comment.