Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ResourceSettings flexibility #2984

Draft
wants to merge 16 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions src/zenml/config/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
from zenml.config.base_settings import BaseSettings, ConfigurationLevel
from zenml.config.pipeline_run_configuration import PipelineRunConfiguration
from zenml.config.pipeline_spec import PipelineSpec
from zenml.config.resource_settings import ResourceSettings
from zenml.config.settings_resolver import SettingsResolver
from zenml.config.stack_component_resource_settings import (
StackComponentResourceSettings,
)
from zenml.config.step_configurations import (
InputSpec,
Step,
Expand Down Expand Up @@ -483,6 +487,13 @@ def _compile_step_invocation(
complete_step_configuration = invocation.finalize(
parameters_to_ignore=parameters_to_ignore
)
self._warn_about_unused_resource_settings(
resource_settings=complete_step_configuration.resource_settings,
stack=stack,
invocation_id=invocation.id,
step_operator=complete_step_configuration.step_operator,
)

return Step(spec=step_spec, config=complete_step_configuration)

def _get_sorted_invocations(
Expand Down Expand Up @@ -552,7 +563,7 @@ def _ensure_required_stack_components_exist(
f"Step `{name}` requires step operator "
f"'{step_operator}' which is not configured in "
f"the stack '{stack.name}'. Available step operators: "
f"{available_step_operators}."
f"{list(available_step_operators)}."
)

experiment_tracker = step.config.experiment_tracker
Expand All @@ -564,7 +575,8 @@ def _ensure_required_stack_components_exist(
f"Step `{name}` requires experiment tracker "
f"'{experiment_tracker}' which is not "
f"configured in the stack '{stack.name}'. Available "
f"experiment trackers: {available_experiment_trackers}."
"experiment trackers: "
f"{list(available_experiment_trackers)}."
)

@staticmethod
Expand Down Expand Up @@ -604,6 +616,60 @@ def _compute_pipeline_spec(

return PipelineSpec(steps=step_specs, **additional_spec_args)

@staticmethod
def _warn_about_unused_resource_settings(
resource_settings: "ResourceSettings",
stack: "Stack",
invocation_id: str,
step_operator: Optional[str] = None,
) -> None:
"""Warn about unused resource settings.

Args:
resource_settings: The resource settings for the step invocation.
stack: The stack on which the step will run.
invocation_id: ID of the step invocation.
step_operator: The step operator used for the step invocation.
"""
if step_operator:
if (
stack.step_operator
and stack.step_operator.name == step_operator
):
settings_class = stack.step_operator.settings_class
else:
# This will fail right after when we validate that the step
# operator configuration is possible with the active stack, so
# we don't need to log any warning here.
return
else:
settings_class = stack.orchestrator.settings_class

if settings_class and issubclass(
settings_class, StackComponentResourceSettings
):
allowed_keys = settings_class.get_allowed_resource_settings_keys()
else:
allowed_keys = set()

ignored_keys = [
key
for key in resource_settings.model_dump(exclude_none=True)
if key not in allowed_keys
]

if ignored_keys:
# TODO: should this be a warning at runtime as well, to get more
# awareness? And also to catch the case when users run from the
# dashboard
logger.warning(
"Ignoring the following resource settings for step `%s` "
"because your active %s does not support them: %s",
invocation_id,
"step operator" if step_operator else "orchestrator",
ignored_keys,
)


def convert_component_shortcut_settings_keys(
settings: Dict[str, "BaseSettings"], stack: "Stack"
Expand Down
21 changes: 19 additions & 2 deletions src/zenml/config/pipeline_configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

from pydantic import SerializeAsAny, field_validator

from zenml.config.constants import DOCKER_SETTINGS_KEY
from zenml.config.constants import DOCKER_SETTINGS_KEY, RESOURCE_SETTINGS_KEY
from zenml.config.retry_config import StepRetryConfig
from zenml.config.source import SourceWithValidator
from zenml.config.strict_base_model import StrictBaseModel
from zenml.model.model import Model

if TYPE_CHECKING:
from zenml.config import DockerSettings
from zenml.config import DockerSettings, ResourceSettings

from zenml.config.base_settings import BaseSettings, SettingsOrDict

Expand Down Expand Up @@ -73,6 +73,23 @@ def ensure_pipeline_name_allowed(cls, name: str) -> str:
)
return name

@property
def resource_settings(self) -> "ResourceSettings":
"""Resource settings of this pipeline configuration.

Returns:
The resource settings of this pipeline configuration.
"""
from zenml.config import ResourceSettings

model_or_dict: SettingsOrDict = self.settings.get(
RESOURCE_SETTINGS_KEY, {}
)

if isinstance(model_or_dict, BaseSettings):
model_or_dict = model_or_dict.model_dump()
return ResourceSettings.model_validate(model_or_dict)

@property
def docker_settings(self) -> "DockerSettings":
"""Docker settings of this pipeline.
Expand Down
27 changes: 20 additions & 7 deletions src/zenml/config/resource_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
from enum import Enum
from typing import Optional, Union

from pydantic import Field, NonNegativeInt, PositiveFloat
from pydantic_settings import SettingsConfigDict
from pydantic import ConfigDict, Field, NonNegativeInt, PositiveFloat

from zenml.config.base_settings import BaseSettings
from zenml.enums import AcceleratorType
from zenml.utils.deprecation_utils import deprecate_pydantic_attributes


class ByteUnit(Enum):
Expand Down Expand Up @@ -65,14 +66,27 @@ class ResourceSettings(BaseSettings):

Attributes:
cpu_count: The amount of CPU cores that should be configured.
gpu_count: The amount of GPUs that should be configured.
memory: The amount of memory that should be configured.
accelerator: The accelerator to use.
accelerator_count: The amount of accelerators that should be configured.
instance_type: The type of instance to use.
"""

cpu_count: Optional[PositiveFloat] = None
gpu_count: Optional[NonNegativeInt] = None
memory: Optional[str] = Field(pattern=MEMORY_REGEX, default=None)

accelerator: Optional[Union[AcceleratorType, str]] = Field(
union_mode="left_to_right", default=None
)
accelerator_count: Optional[NonNegativeInt] = None
instance_type: Optional[str] = None

# DEPRECATED
gpu_count: Optional[NonNegativeInt] = None
_deprecation_validator = deprecate_pydantic_attributes(
("gpu_count", "accelerator_count")
)

@property
def empty(self) -> bool:
"""Returns if this object is "empty" (=no values configured) or not.
Expand Down Expand Up @@ -115,9 +129,8 @@ def get_memory(
# Should never happen due to the regex validation
raise ValueError(f"Unable to parse memory unit from '{memory}'.")

model_config = SettingsConfigDict(
model_config = ConfigDict(
# public attributes are immutable
frozen=True,
# prevent extra attributes during model initialization
extra="forbid",
extra="allow",
)
27 changes: 27 additions & 0 deletions src/zenml/config/stack_component_resource_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Set

from zenml.config.base_settings import BaseSettings


class StackComponentResourceSettings(BaseSettings):
schustmi marked this conversation as resolved.
Show resolved Hide resolved
"""Parent class for stack component resource settings.

Orchestrators and step operators should subclass this class and define which
config/setting attributes are related to infrastructure resources and can
be defined using the `ResourceSettings`.
"""

@classmethod
def get_allowed_resource_settings_keys(cls) -> Set[str]:
"""Get a set of keys that can be defined using the resource settings.

Returns:
Set of keys that can be defined using the resource settings.
"""
for class_ in reversed(cls.__mro__):
if class_ is not StackComponentResourceSettings and issubclass(
class_, StackComponentResourceSettings
):
return set(class_.model_fields)
else:
return set()
17 changes: 17 additions & 0 deletions src/zenml/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,3 +414,20 @@ class StackDeploymentProvider(StrEnum):
AWS = "aws"
GCP = "gcp"
AZURE = "azure"


class AcceleratorType(StrEnum):
# Vertex
K80 = "K80"
P100 = "P100"
V100 = "V100"
P4 = "P4"
T4 = "T4"
A100 = "A100" # A100 40GB
A100_80GB = "A100_80GB"
L4 = "L4"
H100_80GB = "H100_80GB"
TPU_V2 = "TPU_V2"
TPU_V3 = "TPU_V3"
TPU_V4_POD = "TPU_V4_POD"
TPU_V5_LITEPOD = "TPU_V5_LITEPOD"
7 changes: 4 additions & 3 deletions src/zenml/integrations/azure/flavors/azureml.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ class AzureMLComputeSettings(BaseSettings):

2. Compute instance:
- The `mode` is set to `compute-instance`.
- In this case, users have to provide a `compute-name`.
- In this case, users have to provide a `compute_name`.
- If a compute instance exists with this name, this instance
will be used and all the other parameters become irrelevant
and will throw a warning if set.
- If a compute instance does not already exist, ZenML will
create it. You can use the parameters `compute_size` and
create it. You can use the parameters `size` and
`idle_type_before_shutdown_minutes` for this operation.

3. Compute cluster:
- The `mode` is set to `compute-cluster`.
- In this case, users have to provide a `compute-name`.
- In this case, users have to provide a `compute_name`.
- If a compute cluster exists with this name, this instance
will be used and all the other parameters become irrelevant
and will throw a warning if set.
Expand All @@ -71,6 +71,7 @@ class AzureMLComputeSettings(BaseSettings):

# Common Configuration for Compute Instances and Clusters
compute_name: Optional[str] = None
# TODO: migrate to instance type
size: Optional[str] = None

# Additional configuration for a Compute Instance
Expand Down
84 changes: 69 additions & 15 deletions src/zenml/integrations/gcp/flavors/vertex_orchestrator_flavor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
# permissions and limitations under the License.
"""Vertex orchestrator flavor."""

from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type, Union

from zenml.config.base_settings import BaseSettings
from pydantic import Field, NonNegativeInt, PositiveFloat

from zenml.config.resource_settings import MEMORY_REGEX
from zenml.config.stack_component_resource_settings import (
StackComponentResourceSettings,
)
from zenml.enums import AcceleratorType
from zenml.integrations.gcp import (
GCP_RESOURCE_TYPE,
GCP_VERTEX_ORCHESTRATOR_FLAVOR,
Expand All @@ -32,7 +38,66 @@
from zenml.integrations.gcp.orchestrators import VertexOrchestrator


class VertexOrchestratorSettings(BaseSettings):
class VertexOrchestratorResourceSettings(StackComponentResourceSettings):
"""Allowed resource settings for the Vertex orchestrator.

See https://cloud.google.com/vertex-ai/docs/pipelines/machine-types for
more information.

Attributes:
cpu_count: The amount of CPU cores that should be configured.
memory: The amount of memory that should be configured.
accelerator: The accelerator to use.
accelerator_count: The amount of accelerators that should be configured.
"""

cpu_count: Optional[PositiveFloat] = None
memory: Optional[str] = Field(pattern=MEMORY_REGEX, default=None)

accelerator: Optional[Union[AcceleratorType, str]] = Field(
union_mode="left_to_right", default=None
)
accelerator_count: Optional[NonNegativeInt] = None

def get_converted_accelerator(self) -> Optional[str]:
"""Get the converted accelerator type.

Raises:
ValueError: If an unsupported accelerator type was provided.

Returns:
The converted accelerator type.
"""
if not isinstance(self.accelerator, AcceleratorType):
return self.accelerator

accelerator_mapping = {
AcceleratorType.A100: "NVIDIA_TESLA_A100",
AcceleratorType.A100_80GB: "NVIDIA_A100_80GB",
AcceleratorType.H100_80GB: "NVIDIA_H100_80GB",
AcceleratorType.P4: "NVIDIA_TESLA_P4",
AcceleratorType.P100: "NVIDIA_TESLA_P100",
AcceleratorType.V100: "NVIDIA_TESLA_V100",
AcceleratorType.L4: "NVIDIA_L4",
AcceleratorType.T4: "NVIDIA_TESLA_T4",
AcceleratorType.TPU_V2: "TPU_V2",
AcceleratorType.TPU_V3: "TPU_V3",
}

if converted_value := accelerator_mapping.get(self.accelerator):
return converted_value
else:
raise ValueError(
"Unsupported accelerator type for Vertex orchestrator: "
f"{self.accelerator}. If you think this is a mistake and the "
"accelerator is supported, please contact the ZenML team. "
"To solve this issue in the meantime, you can always provide a "
"raw string value for the accelerator that gets directly "
"passed to Vertex."
)


class VertexOrchestratorSettings(VertexOrchestratorResourceSettings):
"""Settings for the Vertex orchestrator.

Attributes:
Expand All @@ -41,18 +106,7 @@ class VertexOrchestratorSettings(BaseSettings):
the client returns immediately and the pipeline is executed
asynchronously. Defaults to `True`.
labels: Labels to assign to the pipeline job.
node_selector_constraint: Each constraint is a key-value pair label.
For the container to be eligible to run on a node, the node must have
each of the constraints appeared as labels.
For example a GPU type can be providing by one of the following tuples:
- ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_A100")
- ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_K80")
- ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P4")
- ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P100")
- ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_T4")
- ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_V100")
Hint: the selected region (location) must provide the requested accelerator
(see https://cloud.google.com/compute/docs/gpus/gpu-regions-zones).
node_selector_constraint: DEPRECATED. Use `accelerator` instead.
pod_settings: Pod settings to apply.
"""

Expand Down
Loading