Skip to content

Commit

Permalink
Introduce new pipeline interface functionality (#695)
Browse files Browse the repository at this point in the history
PR that introduces functionality to new pipeline interface as discussed
[here](#567 (comment))

* The component spec now accepts **OneOf** additionalFields or Fields in
it's consumes and produces section
* The new `consumes` and `produces` are defined at the Op level
similarly to the ones in the component spec, if they are present, they
will override the default `consumes` and `produces` defined in the
component spec (manifet, dataIO)
* Some changes were added to `DataIO` just to resolve tests issues but
the new functionality of the custom consumes and produces is not
implemented yet (will be tackled in a separate PR)

---------

Co-authored-by: Robbe Sneyders <[email protected]>
  • Loading branch information
PhilippeMoussalli and RobbeSneyders committed Dec 7, 2023
1 parent 18c86fb commit 12912d3
Show file tree
Hide file tree
Showing 21 changed files with 688 additions and 91 deletions.
168 changes: 167 additions & 1 deletion src/fondant/core/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from pathlib import Path

import jsonschema.exceptions
import pyarrow as pa
import yaml
from jsonschema import Draft4Validator
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT4

from fondant.core.exceptions import InvalidComponentSpec
from fondant.core.exceptions import InvalidComponentSpec, InvalidPipelineDefinition
from fondant.core.schema import Field, Type


Expand Down Expand Up @@ -168,6 +169,7 @@ def consumes(self) -> t.Mapping[str, Field]:
{
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification.get("consumes", {}).items()
if name != "additionalProperties"
},
)

Expand All @@ -178,9 +180,22 @@ def produces(self) -> t.Mapping[str, Field]:
{
name: Field(name=name, type=Type.from_json(field))
for name, field in self._specification.get("produces", {}).items()
if name != "additionalProperties"
},
)

def is_generic(self, mapping: str) -> bool:
"""Returns a boolean indicating whether the provided mapping is generic.
Args:
mapping: "consumes" or "produces"
"""
additional_fields = self._specification.get(mapping, {}).get(
"additionalProperties",
)

return bool(additional_fields)

@property
def previous_index(self) -> t.Optional[str]:
return self._specification.get("previous_index")
Expand Down Expand Up @@ -271,6 +286,157 @@ def __eq__(self, other):
return self._specification == other._specification


class OperationSpec:
"""A spec for the operation, which contains the `consumes` and `produces` sections of the
component spec, updated with the `consumes` and `produces` mappings provided as arguments to
the operation.
"""

def __init__(
self,
specification: ComponentSpec,
*,
consumes: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
produces: t.Optional[t.Dict[str, t.Union[str, pa.DataType]]] = None,
) -> None:
self.specification = specification

self._mappings = {
"consumes": consumes,
"produces": produces,
}
self._validate_mappings()

self._inner_consumes: t.Optional[t.Mapping[str, Field]] = None
self._outer_consumes: t.Optional[t.Mapping[str, Field]] = None
self._inner_produces: t.Optional[t.Mapping[str, Field]] = None
self._outer_produces: t.Optional[t.Mapping[str, Field]] = None

def _validate_mappings(self) -> None:
"""Validate received consumes and produces mappings on their types."""
for name, mapping in self._mappings.items():
if not mapping:
continue
for key, value in mapping.items():
if not isinstance(value, (str, pa.DataType)):
msg = f"Unexpected type {type(value)} received for key {key} in {name} mapping"
raise InvalidPipelineDefinition(msg)

def _inner_mapping(self, name: str) -> t.Mapping[str, Field]:
"""Calculate the "inner mapping" of the operation. This is the mapping that the component
`transform` (or equivalent) method will receive. This is calculated by starting from the
component spec section, and updating it with any string to type mappings from the
argument mapping.
Args:
name: "consumes" or "produces"
"""
spec_mapping = getattr(self.specification, name)
args_mapping = self._mappings[name]

if not args_mapping:
return spec_mapping

mapping = dict(spec_mapping)

for key, value in args_mapping.items():
if not isinstance(value, pa.DataType):
continue

if not self.specification.is_generic(name):
msg = (
f"Component {self.specification.name} does not allow specifying additional "
f"fields but received {key}."
)
raise InvalidPipelineDefinition(msg)

if key not in spec_mapping:
mapping[key] = Field(name=key, type=Type(value))
else:
spec_type = spec_mapping[key].type.value
if spec_type == value:
# Same info in mapping and component spec, let it pass
pass
else:
msg = (
f"Received pyarrow DataType value {value} for key {key} in the "
f"`{name}` argument passed to the operation, but {key} is "
f"already defined in the `{name}` section of the component spec "
f"with type {spec_type}"
)
raise InvalidPipelineDefinition(msg)

return types.MappingProxyType(mapping)

def _outer_mapping(self, name: str) -> t.Mapping[str, Field]:
"""Calculate the "outer mapping" of the operation. This is the mapping that the dataIO
needs to read / write. This is calculated by starting from the "inner mapping" updating it
with any string to string mappings from the argument mapping.
Args:
name: "consumes" or "produces"
"""
spec_mapping = getattr(self, f"inner_{name}")
args_mapping = self._mappings[name]

if not args_mapping:
return spec_mapping

mapping = dict(spec_mapping)

for key, value in args_mapping.items():
if not isinstance(value, str):
continue

if key in spec_mapping:
mapping[value] = Field(name=value, type=mapping.pop(key).type)
else:
msg = (
f"Received a string value for key {key} in the `{name}` "
f"argument passed to the operation, but {key} is not defined in "
f"the `{name}` section of the component spec."
)
raise InvalidPipelineDefinition(msg)

return types.MappingProxyType(mapping)

@property
def inner_consumes(self) -> t.Mapping[str, Field]:
"""The "inner" `consumes` mapping which the component `transform` (or equivalent) method
will receive.
"""
if self._inner_consumes is None:
self._inner_consumes = self._inner_mapping("consumes")

return self._inner_consumes

@property
def outer_consumes(self) -> t.Mapping[str, Field]:
"""The "outer" `consumes` mapping which the dataIO needs to read / write."""
if self._outer_consumes is None:
self._outer_consumes = self._outer_mapping("consumes")

return self._outer_consumes

@property
def inner_produces(self) -> t.Mapping[str, Field]:
"""The "inner" `produces` mapping which the component `transform` (or equivalent) method
will receive.
"""
if self._inner_produces is None:
self._inner_produces = self._inner_mapping("produces")

return self._inner_produces

@property
def outer_produces(self) -> t.Mapping[str, Field]:
"""The "outer" `produces` mapping which the dataIO needs to read / write."""
if self._outer_produces is None:
self._outer_produces = self._outer_mapping("produces")

return self._outer_produces


class KubeflowComponentSpec:
"""
Class representing a Kubeflow component specification.
Expand Down
22 changes: 16 additions & 6 deletions src/fondant/core/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT4

from fondant.core.component_spec import ComponentSpec
from fondant.core.component_spec import ComponentSpec, OperationSpec
from fondant.core.exceptions import InvalidManifest
from fondant.core.schema import Field, Type

Expand Down Expand Up @@ -240,7 +240,7 @@ def remove_field(self, name: str) -> None:

def evolve( # : PLR0912 (too many branches)
self,
component_spec: ComponentSpec,
component_spec: t.Union[ComponentSpec, OperationSpec],
*,
run_id: t.Optional[str] = None,
) -> "Manifest":
Expand All @@ -255,25 +255,35 @@ def evolve( # : PLR0912 (too many branches)
"""
evolved_manifest = self.copy()

# TODO: clean up when component SDK has been updated to use UpdatedComponentSpec
if isinstance(component_spec, ComponentSpec):
specification = component_spec
produces = component_spec.produces
elif isinstance(component_spec, OperationSpec):
specification = component_spec.specification
produces = component_spec.outer_produces
else:
raise ValueError

# Update `component_id` of the metadata
component_id = component_spec.component_folder_name
component_id = specification.component_folder_name
evolved_manifest.update_metadata(key="component_id", value=component_id)

if run_id is not None:
evolved_manifest.update_metadata(key="run_id", value=run_id)

# Update index location as this is always rewritten
evolved_manifest.add_or_update_field(
Field(name="index", location=component_spec.component_folder_name),
Field(name="index", location=component_id),
)

# Remove all previous fields if the component changes the index
if component_spec.previous_index:
if specification.previous_index:
for field_name in evolved_manifest.fields:
evolved_manifest.remove_field(field_name)

# Add or update all produced fields defined in the component spec
for name, field in component_spec.produces.items():
for name, field in produces.items():
# If field was not part of the input manifest, add field to output manifest.
# If field was part of the input manifest and got produced by the component, update
# the manifest field.
Expand Down
18 changes: 16 additions & 2 deletions src/fondant/core/schemas/common.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,28 @@
]
}
},
"required": ["type"],
"required": [
"type"
],
"additionalProperties": false
},
"additionalProperties": {
"type": "boolean",
"default": {},
"description": "Specifies whether additional fields are allowed"
},
"fields": {
"type": "object",
"minProperties": 1,
"additionalProperties": {
"$ref": "#/definitions/field"
"anyOf": [
{
"$ref": "#/definitions/field"
},
{
"$ref": "#/definitions/additionalProperties"
}
]
}
}
}
Expand Down
Loading

0 comments on commit 12912d3

Please sign in to comment.