Skip to content

Commit

Permalink
Add componentOp warning for unused configuration (#551)
Browse files Browse the repository at this point in the history
Addresses #518 

Separating the ComponentOp for different runners is not ideal since it
would require changing the class names when switching runners. Instead,
it's best to return warning logs if a certain configuration is set but
not valid for a certain runner (e.g. nodepool for docker runner). This
PR modifies the `set_configuration` method of the Compiler to handle
this.
  • Loading branch information
PhilippeMoussalli authored Nov 6, 2023
1 parent 06493c9 commit cfb01c7
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 183 deletions.
37 changes: 22 additions & 15 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ For example, if a component requires GPU for model inference, you can specify th

Here is an example of how to build a pipeline using Fondant:
```python
from fondant.pipeline import ComponentOp, Pipeline
from fondant.pipeline import ComponentOp, Pipeline, Resources

pipeline = Pipeline(pipeline_name="example pipeline", base_path="fs://bucket")

Expand All @@ -28,9 +28,11 @@ caption_images_op = ComponentOp(
"model_id": "Salesforce/blip-image-captioning-base",
"batch_size": 2,
"max_new_tokens": 50,
},
number_of_accelerators=1,
accelerator_name="GPU",
},
resources=Resources(
accelerator_number=1,
accelerator_name="GPU",
)
)

pipeline.add_op(load_from_hub_op)
Expand Down Expand Up @@ -154,9 +156,12 @@ component = ComponentOp(
arguments={
...,
},
number_of_accelerators=1,
accelerator_name="GPU",
resources=Resources(
accelerator_number=1,
accelerator_name="GPU",
)
)

```

</td>
Expand All @@ -168,10 +173,12 @@ component = ComponentOp(
arguments={
...,
},
number_of_accelerators=1,
accelerator_name="NVIDIA_TESLA_K80",
memory_limit="512M",
cpu_limit="4",
resources=Resources(
accelerator_number=1,
accelerator_name="NVIDIA_TESLA_K80",
memory_limit="512M",
cpu_limit="4",
)
)
```

Expand All @@ -184,11 +191,11 @@ component = ComponentOp(
arguments={
...,
},
number_of_accelerators=1,
accelerator_name="GPU",
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
preemptible = True
resources=Resources(
accelerator_number=1,
accelerator_name="GPU",
node_pool_label="node_pool",
node_pool_name="n2-standard-128-pool",
)
```

Expand Down
1 change: 1 addition & 0 deletions src/fondant/pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .pipeline import ( # noqa
ComponentOp,
Pipeline,
Resources,
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
)
68 changes: 45 additions & 23 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ def compile(self, *args, **kwargs) -> None:
def _set_configuration(self, *args, **kwargs) -> None:
"""Abstract method to set pipeline configuration."""

def log_unused_configurations(self, **kwargs):
"""Log configurations that are set but will be unused."""
for config_name, config_value in kwargs.items():
if config_value is not None:
logger.warning(
f"Configuration `{config_name}` is set with `{config_value}` but has no effect"
f" for runner `{self.__class__.__name__}`.",
)


@dataclass
class DockerVolume:
Expand Down Expand Up @@ -222,10 +231,14 @@ def _generate_spec(
"services": services,
}

@staticmethod
def _set_configuration(services, fondant_component_operation, component_name):
accelerator_name = fondant_component_operation.accelerator_name
accelerator_number = fondant_component_operation.number_of_accelerators
def _set_configuration(self, services, fondant_component_operation, component_name):
resources_dict = fondant_component_operation.resources.to_dict()

accelerator_name = resources_dict.pop("accelerator_name")
accelerator_number = resources_dict.pop("accelerator_number")

# Unused configurations
self.log_unused_configurations(**resources_dict)

if accelerator_name is not None:
if accelerator_name not in VALID_ACCELERATOR_TYPES:
Expand Down Expand Up @@ -402,15 +415,20 @@ def kfp_pipeline():
logger.info("Pipeline compiled successfully")

def _set_configuration(self, task, fondant_component_operation):
# Unpack optional specifications
number_of_accelerators = fondant_component_operation.number_of_accelerators
accelerator_name = fondant_component_operation.accelerator_name
node_pool_label = fondant_component_operation.node_pool_label
node_pool_name = fondant_component_operation.node_pool_name
cpu_request = fondant_component_operation.cpu_request
cpu_limit = fondant_component_operation.cpu_limit
memory_request = fondant_component_operation.memory_request
memory_limit = fondant_component_operation.memory_limit
# Used configurations
resources_dict = fondant_component_operation.resources.to_dict()

accelerator_number = resources_dict.pop("accelerator_number")
accelerator_name = resources_dict.pop("accelerator_name")
node_pool_label = resources_dict.pop("node_pool_label")
node_pool_name = resources_dict.pop("node_pool_name")
cpu_request = resources_dict.pop("cpu_request")
cpu_limit = resources_dict.pop("cpu_limit")
memory_request = resources_dict.pop("memory_request")
memory_limit = resources_dict.pop("memory_limit")

# Unused configurations
self.log_unused_configurations(**resources_dict)

# Assign optional specification
if cpu_request is not None:
Expand All @@ -429,7 +447,7 @@ def _set_configuration(self, task, fondant_component_operation):
)
raise InvalidPipelineDefinition(msg)

task.set_accelerator_limit(number_of_accelerators)
task.set_accelerator_limit(accelerator_number)
if accelerator_name == "GPU":
task.set_accelerator_type("nvidia.com/gpu")
elif accelerator_name == "TPU":
Expand Down Expand Up @@ -462,21 +480,25 @@ def resolve_imports(self):
msg,
)

@staticmethod
def _set_configuration(task, fondant_component_operation):
# Unpack optional specifications
cpu_limit = fondant_component_operation.cpu_limit
memory_limit = fondant_component_operation.memory_limit
number_of_accelerators = fondant_component_operation.number_of_accelerators
accelerator_name = fondant_component_operation.accelerator_name
def _set_configuration(self, task, fondant_component_operation):
# Used configurations
resources_dict = fondant_component_operation.resources.to_dict()

cpu_limit = resources_dict.pop("cpu_limit")
memory_limit = resources_dict.pop("memory_limit")
accelerator_number = resources_dict.pop("accelerator_number")
accelerator_name = resources_dict.pop("accelerator_name")

# Unused configurations
self.log_unused_configurations(**resources_dict)

# Assign optional specification
if cpu_limit is not None:
task.set_cpu_limit(cpu_limit)
if memory_limit is not None:
task.set_memory_limit(memory_limit)
if number_of_accelerators is not None:
task.set_accelerator_limit(number_of_accelerators)
if accelerator_number is not None:
task.set_accelerator_limit(accelerator_number)
if accelerator_name not in VALID_VERTEX_ACCELERATOR_TYPES:
msg = (
f"Configured accelerator `{accelerator_name}` is not a valid accelerator type"
Expand Down
Loading

0 comments on commit cfb01c7

Please sign in to comment.