Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update core package #653

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 7 additions & 45 deletions src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,34 +66,6 @@ def kubeflow_type(self) -> str:
return lookup[self.type]


class ComponentSubset:
"""
Class representing a Fondant Component subset.

Args:
specification: the part of the component json representing the subset
"""

def __init__(self, specification: t.Dict[str, t.Any]) -> None:
self._specification = specification

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"

@property
def fields(self) -> t.Mapping[str, Field]:
return types.MappingProxyType(
{
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification["fields"].items()
},
)

@property
def additional_fields(self) -> bool:
return self._specification.get("additionalFields", True)


class ComponentSpec:
"""
Class representing a Fondant component specification.
Expand Down Expand Up @@ -191,38 +163,28 @@ def tags(self) -> t.List[str]:

@property
def index(self):
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
return ComponentSubset({"fields": {}})
return Field(name="index", location=self._specification["index"].location)
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved

@property
def consumes(self) -> t.Mapping[str, ComponentSubset]:
def consumes(self) -> t.Mapping[str, Field]:
"""The subsets consumed by the component as an immutable mapping."""
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
return types.MappingProxyType(
{
name: ComponentSubset(subset)
for name, subset in self._specification.get("consumes", {}).items()
if name != "additionalSubsets"
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification["consumes"].items()
},
)

@property
def produces(self) -> t.Mapping[str, ComponentSubset]:
def produces(self) -> t.Mapping[str, Field]:
"""The subsets produced by the component as an immutable mapping."""
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
return types.MappingProxyType(
{
name: ComponentSubset(subset)
for name, subset in self._specification.get("produces", {}).items()
if name != "additionalSubsets"
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification["produces"].items()
},
)

@property
def accepts_additional_subsets(self) -> bool:
return self._specification.get("consumes", {}).get("additionalSubsets", True)

@property
def outputs_additional_subsets(self) -> bool:
return self._specification.get("produces", {}).get("additionalSubsets", True)

@property
def args(self) -> t.Mapping[str, Argument]:
args = self.default_arguments
Expand Down
228 changes: 97 additions & 131 deletions src/fondant/core/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,6 @@
from fondant.core.schema import Field, Type


class Subset:
"""
Class representing a Fondant subset.

Args:
specification: The part of the manifest json representing the subset
base_path: The base path which the subset location is defined relative to
"""

def __init__(self, specification: dict, *, base_path: str) -> None:
self._specification = specification
self._base_path = base_path

@property
def location(self) -> str:
"""The absolute location of the subset."""
return self._base_path + self._specification["location"]

@property
def fields(self) -> t.Mapping[str, Field]:
"""The fields of the subset returned as an immutable mapping."""
return types.MappingProxyType(
{
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification["fields"].items()
},
)

def add_field(self, name: str, type_: Type, *, overwrite: bool = False) -> None:
if not overwrite and name in self._specification["fields"]:
msg = f"A field with name {name} already exists"
raise ValueError(msg)

self._specification["fields"][name] = type_.to_json()

def remove_field(self, name: str) -> None:
del self._specification["fields"][name]

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._specification!r})"


class Index(Subset):
"""Special case of a subset for the index, which has fixed fields."""

@property
def fields(self) -> t.Dict[str, Field]:
return {
"id": Field(name="id", type=Type("string")),
"source": Field(name="source", type=Type("string")),
}


@dataclass
class Metadata:
"""
Expand Down Expand Up @@ -171,8 +118,8 @@ def create(

specification = {
"metadata": metadata.to_dict(),
"index": {"location": f"/{pipeline_name}/{run_id}/{component_id}/index"},
"subsets": {},
"index": {"location": f"/{component_id}"},
"fields": {},
}
return cls(specification)

Expand All @@ -196,13 +143,41 @@ def copy(self) -> "Manifest":
def metadata(self) -> t.Dict[str, t.Any]:
return self._specification["metadata"]

@property
def index(self) -> t.Dict[str, t.Any]:
return self._specification["index"]
mrchtr marked this conversation as resolved.
Show resolved Hide resolved

def update_metadata(self, key: str, value: t.Any) -> None:
self.metadata[key] = value

@property
def base_path(self) -> str:
return self.metadata["base_path"]

@property
def field_mapping(self):
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved
"""
Retrieve a mapping of field locations to corresponding field names.
A dictionary where keys are field locations and values are lists
of column names.

Example:
{
"/base_path/component_1": ["Name", "HP"],
"/base_path/component_2": ["Type 1", "Type 2"],
}
"""
field_mapping = {}
for field_name, field in self.fields.items():
location = (
f"{self.base_path}/{self.pipeline_name}/{self.run_id}{field.location}"
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved
)
if location in field_mapping:
field_mapping[location].append(field_name)
else:
field_mapping[location] = [field_name]
return field_mapping

@property
def run_id(self) -> str:
return self.metadata["run_id"]
Expand All @@ -220,41 +195,76 @@ def cache_key(self) -> str:
return self.metadata["cache_key"]

@property
def index(self) -> Index:
return Index(self._specification["index"], base_path=self.base_path)

@property
def subsets(self) -> t.Mapping[str, Subset]:
def fields(self) -> t.Mapping[str, Field]:
"""The subsets of the manifest as an immutable mapping."""
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
# e.g. ('images', {'location': '/component1', 'type': 'binary'})
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
return types.MappingProxyType(
{
name: Subset(subset, base_path=self.base_path)
for name, subset in self._specification["subsets"].items()
name: Field(
name=name,
type=Type(field["type"]),
location=field["location"],
)
for name, field in self._specification["fields"].items()
},
)

def add_subset(
def add_fields(
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
self,
name: str,
fields: t.Iterable[t.Union[Field, t.Tuple[str, Type]]],
fields: t.Iterable[Field],
) -> None:
if name in self._specification["subsets"]:
msg = f"A subset with name {name} already exists"
"""Add fields to manifest."""
for field in fields:
if field.name in self._specification["fields"]:
msg = f"A field with name {field.name} already exists"
raise ValueError(msg)

self.add_or_update_field(field, overwrite=False)

def add_or_update_field(self, field: Field, overwrite: bool = False):
"""Add or update field to manifest."""
if field.name == "index":
self._add_or_update_index(field, overwrite=True)
elif overwrite is False and field.name in self._specification["fields"]:
msg = (
f"A field with name {field.name} already exists. Set overwrite to true, "
f"if you want to update the field."
)
raise ValueError(msg)
else:
self._specification["fields"][field.name] = {
"location": f"/{self.component_id}",
"type": field.type.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below. We should store them as json mainly to match the format of the component spec

Suggested change
"type": field.type.name,
"type": field.type.to_json(),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Wasn't aware of this, but it makes totally sense!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This broke some of the tests though.

>>> type.to_json()

{
  "type": type
}

So now we get:

{
  "location": ...,
  "type": {
    "type": type
  }
}

I think we can solve it like this:

self._specification["fields"][field.name] = {
  "location": f"/{self.component_id}",
  **field.type.name,
}

}

def _add_or_update_index(self, field: Field, overwrite: bool = True):
"""Add or update the manifest index."""
if overwrite is False:
msg = (
"The index already exists. Set overwrite to true, "
"if you want to update the index."
)
raise ValueError(msg)

if field.name != "index":
msg = (
f"The field name is {field.name}. If you try to update the index, set the field"
f"name to `index`."
)
raise ValueError(msg)

self._specification["subsets"][name] = {
"location": f"/{self.pipeline_name}/{self.run_id}/{self.component_id}/{name}",
"fields": {name: type_.to_json() for name, type_ in fields},
self._specification["index"] = {
"location": f"/{self.component_id}",
}

def remove_subset(self, name: str) -> None:
if name not in self._specification["subsets"]:
msg = f"Subset {name} not found in specification"
def remove_field(self, name: str) -> None:
if name not in self._specification["fields"]:
msg = f"Field {name} not found in specification"
raise ValueError(msg)

del self._specification["subsets"][name]
del self._specification["fields"][name]

def evolve( # noqa : PLR0912 (too many branches)
def evolve( # : PLR0912 (too many branches)
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
self,
component_spec: ComponentSpec,
*,
Expand All @@ -274,68 +284,24 @@ def evolve( # noqa : PLR0912 (too many branches)
# Update `component_id` of the metadata
component_id = component_spec.component_folder_name
evolved_manifest.update_metadata(key="component_id", value=component_id)

if run_id is not None:
evolved_manifest.update_metadata(key="run_id", value=run_id)

# Update index location as this is currently always rewritten
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
evolved_manifest.index._specification[
"location"
] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}/index"

# If additionalSubsets is False in consumes,
# Remove all subsets from the manifest that are not listed
if not component_spec.accepts_additional_subsets:
for subset_name in evolved_manifest.subsets:
if subset_name not in component_spec.consumes:
evolved_manifest.remove_subset(subset_name)

# If additionalSubsets is False in produces,
# Remove all subsets from the manifest that are not listed
if not component_spec.outputs_additional_subsets:
for subset_name in evolved_manifest.subsets:
if subset_name not in component_spec.produces:
evolved_manifest.remove_subset(subset_name)

# If additionalFields is False for a consumed subset,
# Remove all fields from that subset that are not listed
for subset_name, subset in component_spec.consumes.items():
if subset_name in evolved_manifest.subsets and not subset.additional_fields:
for field_name in evolved_manifest.subsets[subset_name].fields:
if field_name not in subset.fields:
evolved_manifest.subsets[subset_name].remove_field(
field_name,
)
evolved_manifest.add_or_update_field(Field(name="index"))
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
# evolved_manifest._specification["index"][
# "location"
# ] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}"

# TODO handle additionalFields

# For each output subset defined in the component, add or update it
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
for subset_name, subset in component_spec.produces.items():
# Subset is already in manifest, update it
if subset_name in evolved_manifest.subsets:
# If additional fields are not allowed, remove the fields not defined in the
# component spec produces section
if not subset.additional_fields:
for field_name in evolved_manifest.subsets[subset_name].fields:
if field_name not in subset.fields:
evolved_manifest.subsets[subset_name].remove_field(
field_name,
)

# Add fields defined in the component spec produces section
# Overwrite to persist changes to the field (eg. type of column)
for field in subset.fields.values():
evolved_manifest.subsets[subset_name].add_field(
field.name,
field.type,
overwrite=True,
)

# Update subset location as this is currently always rewritten
evolved_manifest.subsets[subset_name]._specification[
"location"
] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}/{subset_name}"

# Subset is not yet in manifest, add it
else:
evolved_manifest.add_subset(subset_name, subset.fields.values())
for name, field in component_spec.produces.items():
# If field was part not part of the input manifest, add field to output manifest.
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
# If field was part of the input manifest and got produced by the component, update
# the manifest field.
evolved_manifest.add_or_update_field(field, overwrite=True)

return evolved_manifest

Expand Down
Loading
Loading