Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add writer component #196

Merged
merged 2 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions fondant/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _load_or_create_manifest(self) -> Manifest:
"""Abstract method that returns the dataset manifest."""

@abstractmethod
def _process_dataset(self, manifest: Manifest) -> dd.DataFrame:
def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]:
"""Abstract method that processes the manifest and
returns another dataframe.
"""
Expand Down Expand Up @@ -223,7 +223,7 @@ def transform(self, *args, **kwargs) -> dd.DataFrame:
kwargs: Arguments provided to the component are passed as keyword arguments
"""

def _process_dataset(self, manifest: Manifest) -> dd.DataFrame:
def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]:
"""
Creates a DataLoader using the provided manifest and loads the input dataframe using the
`load_dataframe` instance, and applies data transformations to it using the `transform`
Expand All @@ -237,3 +237,57 @@ def _process_dataset(self, manifest: Manifest) -> dd.DataFrame:
df = self.transform(dataframe=df, **self.user_arguments)

return df


class WriteComponent(Component):
"""Base class for a Fondant write component."""

@classmethod
def _add_and_parse_args(cls, spec: ComponentSpec):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently the same as the TransformComponent, but I guess the output manifest is not required here?

We still have a lot of duplication in these methods for only small differences, so would like to figure out a better way to do this. Maybe we should still create separate schemas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tackled this in a the PR that follows this by specifying the optional parameters as a specific attribute per component type

https://github.com/ml6team/fondant/pull/199/files#:~:text=if%20arg.name%20in%20cls.optional_fondant_arguments()%3A

we could also tackle it here by having separate secs per component type. This of course would need more reworking and would also required specifying the component type within the component spec yaml (which is not currently the case). Is this what you mean by separate schemas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be a better place to do it indeed. But let's merge the current implementation, it's already an improvement.

parser = argparse.ArgumentParser()
component_arguments = cls._get_component_arguments(spec)

for arg in component_arguments.values():
parser.add_argument(
f"--{arg.name}",
type=kubeflow2python_type[arg.type], # type: ignore
required=True,
help=arg.description,
)

return parser.parse_args()

def _load_or_create_manifest(self) -> Manifest:
return Manifest.from_file(self.input_manifest_path)

@abstractmethod
def write(self, *args, **kwargs):
"""
Abstract method to write a dataframe to a final custom location.

Args:
args: The dataframe will be passed in as a positional argument
kwargs: Arguments provided to the component are passed as keyword arguments
"""

def _process_dataset(self, manifest: Manifest) -> t.Union[None, dd.DataFrame]:
"""
Creates a DataLoader using the provided manifest and loads the input dataframe using the
`load_dataframe` instance, and applies data transformations to it using the `transform`
method implemented by the derived class. Returns a single dataframe.

Returns:
A `dd.DataFrame` instance with updated data based on the applied data transformations.
"""
data_loader = DaskDataLoader(manifest=manifest, component_spec=self.spec)
df = data_loader.load_dataframe()
self.write(dataframe=df, **self.user_arguments)

return None

def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):
"""Create a data writer given a manifest and writes out the index and subsets."""
pass

def upload_manifest(self, manifest: Manifest, save_path: str):
pass
43 changes: 34 additions & 9 deletions tests/test_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import pytest
import yaml

from fondant.component import LoadComponent, TransformComponent
from fondant.component import LoadComponent, TransformComponent, WriteComponent
from fondant.data_io import DaskDataLoader

components_path = Path(__file__).parent / "example_specs/components"
Expand Down Expand Up @@ -75,8 +75,10 @@ def test_component(mock_args):
}


def test_valid_transform_kwargs(monkeypatch):
"""Test that arguments are passed correctly to `Component.transform` method."""
def test_transform_component(monkeypatch):
"""Test that arguments are passed correctly to `Component.transform` method and that valid
errors are returned when required arguments are missing.
"""

class EarlyStopException(Exception):
"""Used to stop execution early instead of mocking all later functionality."""
Expand Down Expand Up @@ -120,12 +122,23 @@ def transform(self, dataframe, *, flag, value):

# Instantiate and run component
component = MyComponent.from_args()

with pytest.raises(EarlyStopException):
component.run()

# Remove component specs from arguments
component_spec_index = sys.argv.index("--component_spec")
del sys.argv[component_spec_index : component_spec_index + 2]

# Instantiate and run component
with pytest.raises(ValueError):
MyComponent.from_args()


def test_invalid_transform_kwargs(monkeypatch):
"""Test that arguments are passed correctly to `Component.transform` method."""
def test_write_component(tmp_path_factory, monkeypatch):
"""Test that arguments are passed correctly to `Component.write` method and that valid
errors are returned when required arguments are missing.
"""

class EarlyStopException(Exception):
"""Used to stop execution early instead of mocking all later functionality."""
Expand All @@ -141,14 +154,16 @@ def mocked_load_dataframe(self):
component_spec = arguments_dir / "component.yaml"
input_manifest = arguments_dir / "input_manifest.json"

yaml_file_to_json_string(component_spec)
component_spec_string = yaml_file_to_json_string(component_spec)

# Implemented Component class
class MyComponent(TransformComponent):
def transform(self, dataframe, *, flag, value):
class MyComponent(WriteComponent):
def write(self, dataframe, *, flag, value):
assert flag == "success"
assert value == 1
raise EarlyStopException()
# Mock write function that sinks final data to a local directory
with tmp_path_factory.mktemp("temp") as fn:
dataframe.to_parquet(fn)

# Mock CLI arguments
sys.argv = [
Expand All @@ -163,8 +178,18 @@ def transform(self, dataframe, *, flag, value):
"1",
"--output_manifest_path",
"",
"--component_spec",
f"{component_spec_string}",
]

# # Instantiate and run component
component = MyComponent.from_args()
component.run()

# Remove component specs from arguments
component_spec_index = sys.argv.index("--component_spec")
del sys.argv[component_spec_index : component_spec_index + 2]

# Instantiate and run component
with pytest.raises(ValueError):
MyComponent.from_args()