Skip to content

Commit

Permalink
Remove pipeline references
Browse files Browse the repository at this point in the history
  • Loading branch information
mrchtr committed Apr 5, 2024
1 parent a5a8baa commit 5d091f9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
13 changes: 7 additions & 6 deletions src/fondant/dataset/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _build_entrypoint(image: Image) -> t.List[str]:


class DockerCompiler(Compiler):
"""Compiler that creates a docker-compose spec from a pipeline."""
"""Compiler that creates a docker-compose spec from a dataset."""

def compile(
self,
Expand All @@ -98,7 +98,7 @@ def compile(
build_args: t.Optional[t.List[str]] = None,
auth_provider: t.Optional[CloudCredentialsMount] = None,
) -> None:
"""Compile a pipeline to docker-compose spec and save it to a specified output path.
"""Compile a dataset workflow to docker-compose spec and save it to a specified output path.
Args:
dataset: the dataset to compile
Expand Down Expand Up @@ -194,7 +194,7 @@ def _generate_spec(
build_args: t.List[str],
) -> dict:
"""Generate a docker-compose spec as a python dictionary,
loops over the pipeline graph to create services and their dependencies.
loops over the dataset graph to create services and their dependencies.
"""
path, volume = self._patch_path(base_path=working_directory)
run_id = dataset.manifest.run_id
Expand Down Expand Up @@ -476,7 +476,7 @@ def __repr__(self) -> str:


class KubeFlowCompiler(Compiler):
"""Compiler that creates a Kubeflow pipeline spec from a pipeline."""
"""Compiler that creates a Kubeflow pipeline spec from a dataset."""

def __init__(self):
self._resolve_imports()
Expand All @@ -503,7 +503,8 @@ def compile(
working_directory: str,
output_path: str,
) -> None:
"""Compile a pipeline to Kubeflow pipeline spec and save it to a specified output path.
"""Compile a dataset workflow to Kubeflow pipeline spec and save it to a specified
output path.
Args:
dataset: the dataset to compile
Expand Down Expand Up @@ -874,7 +875,7 @@ def compile(
*,
role_arn: t.Optional[str] = None,
) -> None:
"""Compile a fondant pipeline to sagemaker pipeline spec and save it
"""Compile a fondant dataset workflow to sagemaker pipeline spec and save it
to a specified output path.
Args:
Expand Down
26 changes: 14 additions & 12 deletions src/fondant/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""This module defines classes to represent a Fondant Pipeline."""
"""This module defines classes to represent a Fondant Dataset."""

import copy
import datetime
Expand Down Expand Up @@ -70,15 +70,16 @@ class Resources:

"""
Class representing the resources to assign to a Fondant Component operation in a Fondant
Pipeline.
Dataset.
Arguments:
number_of_accelerators: The number of accelerators to assign to the operation (GPU, TPU)
accelerator_name: The name of the accelerator to assign. If you're using a cluster setup
on GKE, select "GPU" for GPU or "TPU" for TPU. Make sure
that you select a nodepool with the available hardware. If you're running the
pipeline on Vertex, then select one of the machines specified in the list of
accelerators here https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec.
dataset materilization workflow on Vertex, then select one of the machines specified
in the list of accelerators
here https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec.
node_pool_label: The label of the node pool to which the operation will be assigned.
node_pool_name: The name of the node pool to which the operation will be assigned.
cache: Set to False to disable caching, True by default.
Expand Down Expand Up @@ -119,8 +120,8 @@ def to_dict(self) -> t.Dict[str, t.Any]:

class ComponentOp:
"""
Class representing an operation for a Fondant Component in a Fondant Pipeline. An operation
is a representation of a function that will be executed as part of a pipeline.
Class representing an operation for a Fondant Component in a Fondant Dataset. An operation
is a representation of a function that will be executed as part of the workflow.
Arguments:
name_or_path: The name of a reusable component, or the path to the directory containing a
Expand Down Expand Up @@ -558,7 +559,7 @@ def create(

def sort_graph(self):
"""Sort the graph topologically based on task dependencies."""
logger.info("Sorting pipeline component graph topologically.")
logger.info("Sorting workflow graph topologically.")
sorted_graph = []
visited = set()

Expand All @@ -581,7 +582,7 @@ def depth_first_traversal(node: str):
self._graph = OrderedDict((node, self._graph[node]) for node in sorted_graph)

def validate(self):
"""Sort and run validation on the pipeline definition.
"""Sort and run validation on the dataset definition.
Args:
run_id: run identifier
Expand All @@ -603,10 +604,11 @@ def _validate_dataset_definition(self):
"""
run_id = self.manifest.run_id
if len(self._graph.keys()) == 0:
logger.info("No components defined in the pipeline. Nothing to validate.")
logger.info(
"No components defined in the dataset workflow. Nothing to validate.",
)
return

# TODO: change later if we decide to run 2 fondant pipelines after each other
load_component = True
load_component_name = list(self._graph.keys())[0]

Expand Down Expand Up @@ -663,10 +665,10 @@ def _validate_dataset_definition(self):
)
load_component = False

logger.info("All pipeline component specifications match.")
logger.info("All workflow component specifications match.")

def __repr__(self) -> str:
"""Return a string representation of the FondantPipeline object."""
"""Return a string representation of the Fondant dataset object."""
return f"{self.__class__.__name__}({self._graph!r}"

@property
Expand Down
24 changes: 12 additions & 12 deletions src/fondant/dataset/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ def _run(self, input_spec: str, *args, **kwargs):
"--remove-orphans",
]

print("Starting pipeline run...")
print("Starting workflow run...")

# copy the current environment with the DOCKER_DEFAULT_PLATFORM argument
subprocess.call( # nosec
cmd,
env=dict(os.environ, DOCKER_DEFAULT_PLATFORM="linux/amd64"),
)
print("Finished pipeline run.")
print("Finished workflow run.")

def run(
self,
Expand All @@ -60,7 +60,7 @@ def run(
build_args: t.Optional[t.List[str]] = None,
auth_provider: t.Optional[CloudCredentialsMount] = None,
) -> None:
"""Run a pipeline, either from a compiled docker-compose spec or from a fondant pipeline.
"""Run a workflow, either from a compiled docker-compose spec or from a fondant dataset.
Args:
dataset: the dataset to compile or a path to an already compiled docker-compose spec
Expand All @@ -78,7 +78,7 @@ def run(
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/compose.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
"Found reference to un-compiled workflow... compiling",
)
compiler = DockerCompiler()
compiler.compile(
Expand Down Expand Up @@ -190,7 +190,7 @@ def run(
*,
experiment_name: str = "Default",
):
"""Run a pipeline, either from a compiled kubeflow spec or from a fondant pipeline.
"""Run a workflow, either from a compiled kubeflow spec or from a fondant dataset.
Args:
dataset: the dataset to compile or a path to an already compiled sagemaker spec
Expand All @@ -201,7 +201,7 @@ def run(
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/kubeflow-pipeline.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
"Found reference to un-compiled workflow... compiling",
)
compiler = KubeFlowCompiler()
compiler.compile(
Expand Down Expand Up @@ -237,8 +237,8 @@ def _run(
pipeline_package_path=input_spec,
)

pipeline_url = f"{self.host}/#/runs/details/{runner.run_id}"
logger.info(f"Pipeline is running at: {pipeline_url}")
workflow_url = f"{self.host}/#/runs/details/{runner.run_id}"
logger.info(f"Pipeline is running at: {workflow_url}")

def get_name_from_spec(self, input_spec: str):
"""Get the name of the pipeline from the spec."""
Expand Down Expand Up @@ -274,7 +274,7 @@ def run(
dataset: t.Union[Dataset, str],
working_directory: str,
):
"""Run a pipeline, either from a compiled vertex spec or from a fondant pipeline.
"""Run a workflow, either from a compiled vertex spec or from a fondant dataset.
Args:
dataset: the dataset to compile or a path to an already compiled sagemaker spec
Expand All @@ -284,7 +284,7 @@ def run(
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/vertex-pipeline.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
"Found reference to un-compiled workflow... compiling",
)
compiler = VertexCompiler()
compiler.compile(
Expand Down Expand Up @@ -344,15 +344,15 @@ def run(
Args:
dataset: the dataset to compile or a path to a already compiled sagemaker spec
working_directory: path of the working directory
pipeline_name: the name of the pipeline to create
pipeline_name: the name of the workflow to create
role_arn: the Amazon Resource Name role to use for the processing steps,
if none provided the `sagemaker.get_execution_role()` role will be used.
"""
if isinstance(dataset, Dataset):
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/sagemaker-pipeline.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
"Found reference to un-compiled workflow... compiling",
)
compiler = SagemakerCompiler()
compiler.compile(
Expand Down

0 comments on commit 5d091f9

Please sign in to comment.