Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of new pipeline interface #665

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
35338b6
Update component spec schema validation
mrchtr Nov 16, 2023
a269e3c
Update component spec tests to validate new component spec
mrchtr Nov 16, 2023
ad0dab6
Add additional fields to json schema
mrchtr Nov 16, 2023
7b91535
Update manifest json schema for validation
mrchtr Nov 16, 2023
5d1bf5e
Update manifest creation
mrchtr Nov 17, 2023
d8ecd01
Reduce PR to core module
mrchtr Nov 21, 2023
12c78ca
Addresses comments
mrchtr Nov 21, 2023
c1cad60
Restructure test directory
mrchtr Nov 21, 2023
fd0699c
Remove additional fields in common.json
mrchtr Nov 21, 2023
0f8117f
Test structure
mrchtr Nov 21, 2023
7e8a1d6
Refactor component package
mrchtr Nov 21, 2023
9f67c61
Update src/fondant/core/component_spec.py
mrchtr Nov 21, 2023
40955bf
Update src/fondant/core/manifest.py
mrchtr Nov 21, 2023
6b246a4
Update src/fondant/core/component_spec.py
mrchtr Nov 21, 2023
8ef38d9
Update src/fondant/core/manifest.py
mrchtr Nov 21, 2023
e8c8135
Update src/fondant/core/schema.py
mrchtr Nov 21, 2023
df9a60e
Addresses comments
mrchtr Nov 21, 2023
2256118
Addresses comments
mrchtr Nov 21, 2023
3042fb5
Addresses comments
mrchtr Nov 21, 2023
8fa8be7
Update src/fondant/core/manifest.py
mrchtr Nov 21, 2023
25eb492
Addresses comments
mrchtr Nov 22, 2023
c0fb47a
Merge branch 'feature/implement-new-dataset-format' into feautre/refa…
mrchtr Nov 22, 2023
0701662
Addresses comments
mrchtr Nov 22, 2023
365ca6d
Update test examples
mrchtr Nov 22, 2023
4dc7dc7
Update src/fondant/core/manifest.py
mrchtr Nov 22, 2023
a60ca3e
addresses comments
mrchtr Nov 22, 2023
d2182a0
Merge feature/implement-new-dataset-format into feature/refactore-com…
mrchtr Nov 22, 2023
e141231
Adjust interface for usage of produces and consumes
mrchtr Nov 22, 2023
f3e0a6a
Adjust interface for usage of schema, consumes, and produces
mrchtr Nov 22, 2023
b4fe222
Update core package (#653)
mrchtr Nov 23, 2023
bb3b623
Refactor component package (#654)
mrchtr Nov 23, 2023
e4eadf3
Use new data format (#667)
mrchtr Nov 24, 2023
ae72104
Merge redesign-dataset-format-and-interface into feature/implement-n…
mrchtr Nov 24, 2023
f0344c8
Resolve conflicts
mrchtr Nov 24, 2023
826f061
Addressing comments
mrchtr Nov 24, 2023
4bb35a4
Overwriting consumes and produces of component specification
mrchtr Nov 24, 2023
e7a960f
Consumes and produces renaming
mrchtr Nov 24, 2023
045769f
Merge branch 'main' into feature/implement-new-pipeline-interface
RobbeSneyders Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/fondant/component/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ class BaseComponent:
**kwargs: The provided user arguments are passed in as keyword arguments
"""

def __init__(self, spec: ComponentSpec, **kwargs):
def __init__(
self,
spec: ComponentSpec,
schema: t.Optional[t.Dict[str, str]] = None,
**kwargs,
):
pass


Expand Down
13 changes: 13 additions & 0 deletions src/fondant/component/data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ def __init__(
manifest: Manifest,
component_spec: ComponentSpec,
input_partition_rows: t.Optional[int] = None,
consumes: t.Optional[t.Dict[str, str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.input_partition_rows = input_partition_rows
self.consumes = consumes

def partition_loaded_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Expand Down Expand Up @@ -131,6 +133,10 @@ def load_dataframe(self) -> dd.DataFrame:

logging.info(f"Columns of dataframe: {list(dataframe.columns)}")

# Renaming dataframe columns
if self.consumes:
dataframe = dataframe.rename(columns=self.consumes)

return dataframe


Expand All @@ -140,8 +146,10 @@ def __init__(
*,
manifest: Manifest,
component_spec: ComponentSpec,
produces: t.Optional[t.Dict[str, str]] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.produces = produces

def write_dataframe(
self,
Expand All @@ -158,6 +166,11 @@ def write_dataframe(
self.validate_dataframe_columns(dataframe, columns_to_produce)

dataframe = dataframe[columns_to_produce]

# Renaming dataframe produces
if self.produces:
dataframe = dataframe.rename(columns=self.produces)

write_task = self._write_dataframe(dataframe)

with ProgressBar():
Expand Down
52 changes: 49 additions & 3 deletions src/fondant/component/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def __init__(
input_partition_rows: int,
cluster_type: t.Optional[str] = None,
client_kwargs: t.Optional[dict] = None,
schema: t.Optional[t.Dict[str, str]] = None,
consumes: t.Optional[t.Dict[str, str]] = None,
produces: t.Optional[t.Dict[str, str]] = None,
) -> None:
self.spec = spec
self.cache = cache
Expand All @@ -75,6 +78,10 @@ def __init__(
self.metadata = Metadata.from_dict(metadata)
self.user_arguments = user_arguments
self.input_partition_rows = input_partition_rows
self.schema = schema

self.consumes = consumes
self.produces = produces

if cluster_type == "local":
client_kwargs = client_kwargs or {
Expand Down Expand Up @@ -112,6 +119,10 @@ def from_args(cls) -> "Executor":
parser.add_argument("--input_partition_rows", type=int)
parser.add_argument("--cluster_type", type=str)
parser.add_argument("--client_kwargs", type=json.loads)
parser.add_argument("--schema", type=json.loads)
parser.add_argument("--consumes", type=json.loads)
parser.add_argument("--produces", type=json.loads)

args, _ = parser.parse_known_args()

if "component_spec" not in args:
Expand All @@ -123,13 +134,19 @@ def from_args(cls) -> "Executor":
cache = args.cache
cluster_type = args.cluster_type
client_kwargs = args.client_kwargs
schema = args.schema
consumes = args.consumes
produces = args.produces

return cls.from_spec(
component_spec,
cache=cache,
input_partition_rows=input_partition_rows,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
schema=schema,
consumes=consumes,
produces=produces,
)

@classmethod
Expand All @@ -141,6 +158,9 @@ def from_spec(
input_partition_rows: int,
cluster_type: t.Optional[str],
client_kwargs: t.Optional[dict],
schema: t.Optional[dict],
consumes: t.Optional[dict],
produces: t.Optional[dict],
) -> "Executor":
"""Create an executor from a component spec."""
args_dict = vars(cls._add_and_parse_args(component_spec))
Expand All @@ -160,6 +180,15 @@ def from_spec(
if "client_kwargs" in args_dict:
args_dict.pop("client_kwargs")

if "schema" in args_dict:
args_dict.pop("schema")

if "consumes" in args_dict:
args_dict.pop("consumes")

if "produces" in args_dict:
args_dict.pop("produces")

input_manifest_path = args_dict.pop("input_manifest_path")
output_manifest_path = args_dict.pop("output_manifest_path")
metadata = args_dict.pop("metadata")
Expand All @@ -175,6 +204,9 @@ def from_spec(
input_partition_rows=input_partition_rows,
cluster_type=cluster_type,
client_kwargs=client_kwargs,
schema=schema,
consumes=consumes,
produces=produces,
)

@classmethod
Expand Down Expand Up @@ -251,11 +283,18 @@ def _execute_component(
A Dask DataFrame containing the output data
"""

def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):
def _write_data(
self,
dataframe: dd.DataFrame,
*,
manifest: Manifest,
produces: t.Optional[t.Dict[str, str]],
):
"""Create a data writer given a manifest and writes out the index and subsets."""
data_writer = DaskDataWriter(
manifest=manifest,
component_spec=self.spec,
produces=produces,
)

data_writer.write_dataframe(dataframe, self.client)
Expand Down Expand Up @@ -331,7 +370,7 @@ def _run_execution(
input_manifest: Manifest,
) -> Manifest:
logging.info("Executing component")
component = component_cls(self.spec, **self.user_arguments)
component = component_cls(self.spec, self.schema, **self.user_arguments)
output_df = self._execute_component(
component,
manifest=input_manifest,
Expand All @@ -340,7 +379,11 @@ def _run_execution(
component_spec=self.spec,
run_id=self.metadata.run_id,
)
self._write_data(dataframe=output_df, manifest=output_manifest)
self._write_data(
dataframe=output_df,
manifest=output_manifest,
produces=self.produces,
)

return output_manifest

Expand Down Expand Up @@ -478,6 +521,7 @@ def _execute_component(
manifest=manifest,
component_spec=self.spec,
input_partition_rows=self.input_partition_rows,
consumes=self.consumes,
)
dataframe = data_loader.load_dataframe()
return component.transform(dataframe)
Expand Down Expand Up @@ -530,6 +574,7 @@ def _execute_component(
manifest=manifest,
component_spec=self.spec,
input_partition_rows=self.input_partition_rows,
consumes=self.consumes,
)
dataframe = data_loader.load_dataframe()

Expand Down Expand Up @@ -574,6 +619,7 @@ def _execute_component(
manifest=manifest,
component_spec=self.spec,
input_partition_rows=self.input_partition_rows,
consumes=self.consumes,
)
dataframe = data_loader.load_dataframe()
component.write(dataframe)
Expand Down
Loading
Loading