Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create separate class for metadata #372

Merged
merged 7 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ the data is stored and references to the pipeline and component that were used t
```json
{
"metadata": {
"pipeline_name": "pipeline_name",
"base_path": "gs://bucket",
"run_id": "12345",
"run_id": "pipeline_name_12345",
"component_id": "67890"
}
}
Expand Down
72 changes: 28 additions & 44 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import json
import logging
import typing as t
Expand All @@ -8,6 +7,7 @@

import yaml

from fondant.manifest import Metadata
from fondant.pipeline import Pipeline

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -37,19 +37,6 @@ class DockerVolume:
target: str


@dataclass
class MetaData:
"""Dataclass representing the metadata arguments of a pipeline.

Args:
run_id: identifier of the current pipeline run
base_path: the base path used to store the artifacts.
"""

run_id: str
base_path: str


class DockerCompiler(Compiler):
"""Compiler that creates a docker-compose spec from a pipeline."""

Expand Down Expand Up @@ -90,13 +77,6 @@ def ignore_aliases(self, data):

logger.info(f"Successfully compiled to {output_path}")

@staticmethod
def _safe_component_name(component_name: str) -> str:
"""Transform a component name to a docker-compose friendly one.
eg: `Component A` -> `component_a`.
"""
return component_name.replace(" ", "_").lower()

@staticmethod
def _patch_path(base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]:
"""Helper that checks if the base_path is local or remote,
Expand Down Expand Up @@ -132,29 +112,33 @@ def _generate_spec(
"""Generate a docker-compose spec as a python dictionary,
loops over the pipeline graph to create services and their dependencies.
"""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
path, volume = self._patch_path(base_path=pipeline.base_path)
run_id = f"{pipeline.name}-{timestamp}"
metadata = MetaData(run_id=run_id, base_path=path)
run_id = pipeline.get_run_id()

services = {}

pipeline.validate(run_id=run_id)

for component_name, component in pipeline._graph.items():
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=path,
component_id=component_name,
)

logger.info(f"Compiling service for {component_name}")
safe_component_name = self._safe_component_name(component_name)

component_op = component["fondant_component_op"]

# add metadata argument to command
command = ["--metadata", json.dumps(asdict(metadata))]
command = ["--metadata", metadata.to_json()]

# add in and out manifest paths to command
command.extend(
[
"--output_manifest_path",
f"{path}/{safe_component_name}/manifest.json",
f"{path}/{component_name}/manifest.json",
],
)

Expand All @@ -169,15 +153,14 @@ def _generate_spec(
depends_on = {}
if component["dependencies"]:
for dependency in component["dependencies"]:
safe_dependency = self._safe_component_name(dependency)
depends_on[safe_dependency] = {
depends_on[dependency] = {
"condition": "service_completed_successfully",
}
# there is only an input manifest if the component has dependencies
command.extend(
[
"--input_manifest_path",
f"{path}/{safe_dependency}/manifest.json",
f"{path}/{dependency}/manifest.json",
],
)

Expand All @@ -187,7 +170,7 @@ def _generate_spec(
if extra_volumes:
volumes.extend(extra_volumes)

services[safe_component_name] = {
services[component_name] = {
"command": command,
"depends_on": depends_on,
"volumes": volumes,
Expand All @@ -197,14 +180,12 @@ def _generate_spec(
logger.info(
f"Found Dockerfile for {component_name}, adding build step.",
)
services[safe_component_name]["build"] = {
services[component_name]["build"] = {
"context": str(component_op.component_dir),
"args": build_args,
}
else:
services[safe_component_name][
"image"
] = component_op.component_spec.image
services[component_name]["image"] = component_op.component_spec.image
return {
"name": pipeline.name,
"version": "3.8",
Expand Down Expand Up @@ -242,12 +223,21 @@ def compile(
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec
"""
run_id = pipeline.get_run_id()

@self.kfp.dsl.pipeline(name=pipeline.name, description=pipeline.description)
def kfp_pipeline():
previous_component_task = None
manifest_path = ""
for component_name, component in self.pipeline._graph.items():

for component_name, component in pipeline._graph.items():
metadata = Metadata(
pipeline_name=pipeline.name,
run_id=run_id,
base_path=pipeline.base_path,
component_id=component_name,
)

logger.info(f"Compiling service for {component_name}")

component_op = component["fondant_component_op"]
Expand All @@ -259,16 +249,10 @@ def kfp_pipeline():
# Execute the Kubeflow component and pass in the output manifest path from
# the previous component.
component_args = component_op.arguments
metadata = json.dumps(
{
"base_path": self.pipeline.base_path,
"run_id": "{{workflow.name}}",
},
)

component_task = kubeflow_component_op(
input_manifest_path=manifest_path,
metadata=metadata,
metadata=metadata.to_json(),
**component_args,
)
# Set optional configurations
Expand All @@ -287,7 +271,7 @@ def kfp_pipeline():
previous_component_task = component_task

self.pipeline = pipeline
self.pipeline.validate(run_id="{{workflow.name}}")
self.pipeline.validate(run_id=run_id)
logger.info(f"Compiling {self.pipeline.name} to {output_path}")

self.kfp.compiler.Compiler().compile(kfp_pipeline, output_path) # type: ignore
Expand Down
9 changes: 5 additions & 4 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type
from fondant.data_io import DaskDataLoader, DaskDataWriter
from fondant.manifest import Manifest
from fondant.manifest import Manifest, Metadata
from fondant.schema import validate_partition_number

logger = logging.getLogger(__name__)
Expand All @@ -46,7 +46,7 @@ def __init__(
self.spec = spec
self.input_manifest_path = input_manifest_path
self.output_manifest_path = output_manifest_path
self.metadata = metadata
self.metadata = Metadata.from_dict(metadata)
self.user_arguments = user_arguments
self.input_partition_rows = input_partition_rows

Expand Down Expand Up @@ -239,8 +239,9 @@ def optional_fondant_arguments() -> t.List[str]:
def _load_or_create_manifest(self) -> Manifest:
component_id = self.spec.name.lower().replace(" ", "_")
return Manifest.create(
base_path=self.metadata["base_path"],
run_id=self.metadata["run_id"],
pipeline_name=self.metadata.pipeline_name,
base_path=self.metadata.base_path,
run_id=self.metadata.run_id,
component_id=component_id,
)

Expand Down
48 changes: 42 additions & 6 deletions src/fondant/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pkgutil
import types
import typing as t
from dataclasses import asdict, dataclass
from pathlib import Path

import jsonschema.exceptions
Expand Down Expand Up @@ -70,6 +71,26 @@ def fields(self) -> t.Dict[str, Field]:
}


@dataclass
class Metadata:
"""Class representing the Metadata of the manifest."""

base_path: str
pipeline_name: str
run_id: str
component_id: str

def to_dict(self):
return asdict(self)

def to_json(self):
return json.dumps(self.to_dict())

@classmethod
def from_dict(cls, data_dict):
return cls(**data_dict)


class Manifest:
"""
Class representing a Fondant manifest.
Expand Down Expand Up @@ -112,20 +133,31 @@ def retrieve_from_filesystem(uri: str) -> Resource:
raise InvalidManifest.create_from(e)

@classmethod
def create(cls, *, base_path: str, run_id: str, component_id: str) -> "Manifest":
def create(
cls,
*,
pipeline_name: str,
base_path: str,
run_id: str,
component_id: str,
) -> "Manifest":
"""Create an empty manifest.

Args:
pipeline_name: the bane of the pipeline
PhilippeMoussalli marked this conversation as resolved.
Show resolved Hide resolved
base_path: The base path of the manifest
run_id: The id of the current pipeline run
component_id: The id of the current component being executed
"""
metadata = Metadata(
pipeline_name=pipeline_name,
base_path=base_path,
run_id=run_id,
component_id=component_id,
)

specification = {
"metadata": {
"base_path": base_path,
"run_id": run_id,
"component_id": component_id,
},
"metadata": metadata.to_dict(),
"index": {"location": f"/index/{run_id}/{component_id}"},
"subsets": {},
}
Expand Down Expand Up @@ -166,6 +198,10 @@ def run_id(self) -> str:
def component_id(self) -> str:
return self.metadata["component_id"]

@property
def pipeline_name(self) -> str:
return self.metadata["pipeline_name"]

@property
def index(self) -> Index:
return Index(self._specification["index"], base_path=self.base_path)
Expand Down
23 changes: 15 additions & 8 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module defines classes to represent a Fondant Pipeline."""
import datetime
import hashlib
import json
import logging
Expand Down Expand Up @@ -57,12 +58,13 @@ def __init__(
node_pool_name: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.input_partition_rows = input_partition_rows
self.arguments = self._set_arguments(arguments)

self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
)
self.name = self.component_spec.name.replace(" ", "_").lower()
self.input_partition_rows = input_partition_rows
self.arguments = self._set_arguments(arguments)

self.arguments.setdefault("component_spec", self.component_spec.specification)

self.number_of_gpus = number_of_gpus
Expand Down Expand Up @@ -238,11 +240,9 @@ def add_op(
msg,
)

dependencies_names = [
dependency.component_spec.name for dependency in dependencies
]
dependencies_names = [dependency.name for dependency in dependencies]

self._graph[task.component_spec.name] = {
self._graph[task.name] = {
"fondant_component_op": task,
"dependencies": dependencies_names,
}
Expand Down Expand Up @@ -279,11 +279,17 @@ def _validate_pipeline_name(pipeline_name: str) -> str:
raise InvalidPipelineDefinition(msg)
return pipeline_name

def get_run_id(self) -> str:
"""Get a unique run ID for the pipeline."""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
return f"{self.name}-{timestamp}"

def validate(self, run_id: str):
"""Sort and run validation on the pipeline definition.

Args:
run_id (str, optional): run identifier. Defaults to None.
run_id: run identifier

"""
self.sort_graph()
self._validate_pipeline_definition(run_id)
Expand All @@ -310,6 +316,7 @@ def _validate_pipeline_definition(self, run_id: str):

# Create initial manifest
manifest = Manifest.create(
pipeline_name=self.name,
base_path=self.base_path,
run_id=run_id,
component_id=load_component_name,
Expand Down
4 changes: 4 additions & 0 deletions src/fondant/schemas/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"type": "string",
"format": "uri"
},
"pipeline_name": {
"type": "string"
},
"run_id": {
"type": "string"
},
Expand All @@ -18,6 +21,7 @@
},
"required": [
"base_path",
"pipeline_name",
"run_id",
"component_id"
]
Expand Down
3 changes: 2 additions & 1 deletion tests/example_data/manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
{
"metadata": {
"pipeline_name": "test_pipeline",
"base_path": "tests/example_data/subsets_input",
"run_id": "12345",
"run_id": "test_pipeline_12345",
"component_id": "67890"
},
"index": {
Expand Down
Loading