Skip to content

Commit

Permalink
add mpi and fix requirements.txt
Browse files Browse the repository at this point in the history
  • Loading branch information
Yubo Wang committed May 16, 2023
1 parent 5ac1eca commit 9696dc3
Show file tree
Hide file tree
Showing 16 changed files with 810 additions and 174 deletions.
65 changes: 64 additions & 1 deletion plugins/flytekit-kf-mpi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,67 @@ To install the plugin, run the following command:
pip install flytekitplugins-kfmpi
```

_Example coming soon!_
## Code Example
MPI usage:
```python
@task(
task_config=MPIJob(
launcher=Launcher(
replicas=1,
),
worker=Worker(
replicas=5,
requests=Resources(cpu="2", mem="2Gi"),
limits=Resources(cpu="4", mem="2Gi"),
),
slots=2,
),
cache=True,
requests=Resources(cpu="1"),
cache_version="1",
)
def my_mpi_task(x: int, y: str) -> int:
return x
```


Horovod Usage:
You can override the command of a replica group by:
```python
@task(
task_config=HorovodJob(
launcher=Launcher(
replicas=1,
requests=Resources(cpu="1"),
limits=Resources(cpu="2"),
),
worker=Worker(
replicas=1,
command=["/usr/sbin/sshd", "-De", "-f", "/home/jobuser/.sshd_config"],
restart_policy=RestartPolicy.NEVER,
),
slots=2,
verbose=False,
log_level="INFO",
),
)
def my_horovod_task():
...
```




## Upgrade MPI Plugin from V0 to V1
MPI plugin is now updated from v0 to v1 to enable more configuration options.
To migrate from v0 to v1, change the following:
1. Update flytepropeller to v1.6.0
2. Update flytekit version to v1.6.2
3. Update your code from:
```
task_config=MPIJob(num_workers=10),
```
to
```
task_config=MPIJob(worker=Worker(replicas=10)),
```
2 changes: 1 addition & 1 deletion plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
MPIJob
"""

from .task import HorovodJob, MPIJob
from .task import HorovodJob, MPIJob, Worker, Launcher, CleanPodPolicy, RunPolicy, RestartPolicy
226 changes: 150 additions & 76 deletions plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,82 @@
This Plugin adds the capability of running distributed MPI training to Flyte using backend plugins, natively on
Kubernetes. It leverages `MPI Job <https://github.com/kubeflow/mpi-operator>`_ Plugin from kubeflow.
"""
from dataclasses import dataclass
from typing import Any, Callable, Dict, List
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, Optional, Union, List

from flyteidl.plugins import mpi_pb2 as _mpi_task
from flyteidl.plugins.kubeflow import mpi_pb2 as mpi_task
from google.protobuf.json_format import MessageToDict

from flytekit import PythonFunctionTask
from flytekit import PythonFunctionTask, Resources
from flytekit.core.resources import convert_resources_to_resource_model
from flytekit.configuration import SerializationSettings
from flytekit.extend import TaskPlugins
from flytekit.models import common as _common
from flyteidl.plugins.kubeflow import common_pb2 as kubeflow_common


class MPIJobModel(_common.FlyteIdlEntity):
"""Model definition for MPI the plugin
Args:
num_workers: integer determining the number of worker replicas spawned in the cluster for this job
(in addition to 1 master).
num_launcher_replicas: Number of launcher server replicas to use
slots: Number of slots per worker used in hostfile
.. note::
Please use resources=Resources(cpu="1"...) to specify per worker resource
@dataclass
class RestartPolicy(Enum):
"""
RestartPolicy describes how the replicas should be restarted
"""

def __init__(self, num_workers, num_launcher_replicas, slots):
self._num_workers = num_workers
self._num_launcher_replicas = num_launcher_replicas
self._slots = slots

@property
def num_workers(self):
return self._num_workers
ALWAYS = kubeflow_common.RESTART_POLICY_ALWAYS
FAILURE = kubeflow_common.RESTART_POLICY_ON_FAILURE
NEVER = kubeflow_common.RESTART_POLICY_NEVER

@property
def num_launcher_replicas(self):
return self._num_launcher_replicas
@dataclass
class CleanPodPolicy(Enum):
"""
CleanPodPolicy describes how to deal with pods when the job is finished.
"""

@property
def slots(self):
return self._slots
NONE = kubeflow_common.CLEANPOD_POLICY_NONE
ALL = kubeflow_common.CLEANPOD_POLICY_ALL
RUNNING = kubeflow_common.CLEANPOD_POLICY_RUNNING

def to_flyte_idl(self):
return _mpi_task.DistributedMPITrainingTask(
num_workers=self.num_workers, num_launcher_replicas=self.num_launcher_replicas, slots=self.slots
)
@dataclass
class RunPolicy:
"""
RunPolicy describes some policy to apply to the execution of a kubeflow job.
Args:
clean_pod_policy: Defines the policy for cleaning up pods after the PyTorchJob completes. Default to None.
ttl_seconds_after_finished (int): Defines the TTL for cleaning up finished PyTorchJobs.
active_deadline_seconds (int): Specifies the duration (in seconds) since startTime during which the job.
can remain active before it is terminated. Must be a positive integer. This setting applies only to pods.
where restartPolicy is OnFailure or Always.
backoff_limit (int): Number of retries before marking this job as failed.
"""
clean_pod_policy: CleanPodPolicy = None
ttl_seconds_after_finished: Optional[int] = None
active_deadline_seconds: Optional[int] = None
backoff_limit: Optional[int] = None

@classmethod
def from_flyte_idl(cls, pb2_object):
return cls(
num_workers=pb2_object.num_workers,
num_launcher_replicas=pb2_object.num_launcher_replicas,
slots=pb2_object.slots,
)
@dataclass
class Worker:
"""
Worker replica configuration. Worker command can be customized. If not specified, the worker will use
default command generated by the mpi operator.
"""
command : Optional[List[str]] = None
image: Optional[str] = None
requests: Optional[Resources] = None
limits: Optional[Resources] = None
replicas: Optional[int] = 1
restart_policy: Optional[RestartPolicy] = None


@dataclass
class Launcher:
"""
Launcher replica configuration. Launcher command can be customized. If not specified, the launcher will use
the command specified in the task signature.
"""
command : Optional[List[str]] = None
image: Optional[str] = None
requests: Optional[Resources] = None
limits: Optional[Resources] = None
replicas: Optional[int] = 1
restart_policy: Optional[RestartPolicy] = None


@dataclass
Expand All @@ -67,18 +87,20 @@ class MPIJob(object):
to run distributed training on k8s with MPI
Args:
num_workers: integer determining the number of worker replicas spawned in the cluster for this job
(in addition to 1 master).
num_launcher_replicas: Number of launcher server replicas to use
slots: Number of slots per worker used in hostfile
launcher: Configuration for the launcher replica group.
worker: Configuration for the worker replica group.
run_policy: Configuration for the run policy.
slots: The number of slots per worker used in the hostfile.
num_launcher_replicas: [DEPRECATED] The number of launcher server replicas to use. This argument is deprecated.
num_workers: [DEPRECATED] The number of worker replicas to spawn in the cluster for this job
"""

slots: int
num_launcher_replicas: int = 1
num_workers: int = 1
launcher: Launcher = field(default_factory=lambda: Launcher())
worker: Worker = field(default_factory=lambda: Worker())
run_policy: Optional[RunPolicy] = field(default_factory=lambda: None)
slots: int = 1
# Support v0 config for backwards compatibility
num_launcher_replicas: Optional[int] = None
num_workers: Optional[int] = None


class MPIFunctionTask(PythonFunctionTask[MPIJob]):
Expand Down Expand Up @@ -116,65 +138,117 @@ def __init__(self, task_config: MPIJob, task_function: Callable, **kwargs):
task_type=self._MPI_JOB_TASK_TYPE,
**kwargs,
)

def _convert_replica_spec(self, replica_config: Union[Launcher, Worker]) -> mpi_task.DistributedMPITrainingReplicaSpec:
resources = convert_resources_to_resource_model(requests=replica_config.requests, limits=replica_config.limits)
return mpi_task.DistributedMPITrainingReplicaSpec(
command=replica_config.command,
replicas=replica_config.replicas,
image=replica_config.image,
resources=resources.to_flyte_idl() if resources else None,
restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None,
)

def _convert_run_policy(self, run_policy: RunPolicy) -> kubeflow_common.RunPolicy:
return kubeflow_common.RunPolicy(
clean_pod_policy=run_policy.clean_pod_policy.value if run_policy.clean_pod_policy else None,
ttl_seconds_after_finished=run_policy.ttl_seconds_after_finished,
active_deadline_seconds=run_policy.active_deadline_seconds,
backoff_limit=run_policy.active_deadline_seconds,
)

def _get_base_command(self, settings: SerializationSettings) -> List[str]:
return super().get_command(settings)

def get_command(self, settings: SerializationSettings) -> List[str]:
cmd = super().get_command(settings)
num_procs = self.task_config.num_workers * self.task_config.slots
cmd = self._get_base_command(settings)
if self.task_config.num_workers:
num_workers = self.task_config.num_workers
else:
num_workers = self.task_config.worker.replicas
num_procs = num_workers * self.task_config.slots
mpi_cmd = self._MPI_BASE_COMMAND + ["-np", f"{num_procs}"] + ["python", settings.entrypoint_settings.path] + cmd
# the hostfile is set automatically by MPIOperator using env variable OMPI_MCA_orte_default_hostfile
return mpi_cmd

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
job = MPIJobModel(
num_workers=self.task_config.num_workers,
num_launcher_replicas=self.task_config.num_launcher_replicas,
worker = self._convert_replica_spec(self.task_config.worker)
if (self.task_config.num_workers):
worker.replicas = self.task_config.num_workers

launcher = self._convert_replica_spec(self.task_config.launcher)
if (self.task_config.num_launcher_replicas):
launcher.replicas = self.task_config.num_launcher_replicas

run_policy = self._convert_run_policy(self.task_config.run_policy) if self.task_config.run_policy else None
mpi_job = mpi_task.DistributedMPITrainingTask(
worker_replicas=worker,
launcher_replicas=launcher,
slots=self.task_config.slots,
run_policy=run_policy,
)
return MessageToDict(job.to_flyte_idl())
return MessageToDict(mpi_job)


@dataclass
class HorovodJob(object):
slots: int
num_launcher_replicas: int = 1
num_workers: int = 1
"""
Configuration for an executable `Horovod Job using MPI operator<https://github.com/kubeflow/mpi-operator>`_. Use this
to run distributed training on k8s with MPI. For more info, check out Running Horovod<https://horovod.readthedocs.io/en/stable/summary_include.html#running-horovod>`_.
Args:
worker: Worker configuration for the job.
launcher: Launcher configuration for the job.
run_policy: Configuration for the run policy.
slots: Number of slots per worker used in hostfile (default: 1).
verbose: Optional flag indicating whether to enable verbose logging (default: False).
log_level: Optional string specifying the log level (default: "INFO").
discovery_script_path: Path to the discovery script used for host discovery (default: "/etc/mpi/discover_hosts.sh").
num_launcher_replicas: [DEPRECATED] The number of launcher server replicas to use. This argument is deprecated. Please use launcher.replicas instead.
num_workers: [DEPRECATED] The number of worker replicas to spawn in the cluster for this job. Please use worker.replicas instead.
"""

worker: Worker = field(default_factory=lambda: Worker())
launcher: Launcher = field(default_factory=lambda: Launcher())
run_policy: Optional[RunPolicy] = field(default_factory=lambda: None)
slots: int = 1
verbose: Optional[bool] = False
log_level: Optional[str] = "INFO"
discovery_script_path: Optional[str] = "/etc/mpi/discover_hosts.sh"
# Support v0 config for backwards compatibility
num_launcher_replicas: Optional[int] = None
num_workers: Optional[int] = None

class HorovodFunctionTask(MPIFunctionTask):
"""
For more info, check out https://github.com/horovod/horovod
"""

# Customize your setup here. Please ensure the cmd, path, volume, etc are available in the pod.
ssh_command = "/usr/sbin/sshd -De -f /home/jobuser/.sshd_config"
discovery_script_path = "/etc/mpi/discover_hosts.sh"

def __init__(self, task_config: HorovodJob, task_function: Callable, **kwargs):

super().__init__(
task_config=task_config,
task_function=task_function,
**kwargs,
)

def get_command(self, settings: SerializationSettings) -> List[str]:
cmd = super().get_command(settings)
cmd = self._get_base_command(settings)
mpi_cmd = self._get_horovod_prefix() + cmd
return mpi_cmd

def get_config(self, settings: SerializationSettings) -> Dict[str, str]:
config = super().get_config(settings)
return {**config, "worker_spec_command": self.ssh_command}

def _get_horovod_prefix(self) -> List[str]:
np = self.task_config.num_workers * self.task_config.slots
np = self.task_config.worker.replicas * self.task_config.slots
verbose = "--verbose" if self.task_config.verbose is True else ""
log_level = self.task_config.log_level
base_cmd = [
"horovodrun",
"-np",
f"{np}",
"--verbose",
f"{verbose}",
"--log-level",
"INFO",
f"{log_level}",
"--network-interface",
"eth0",
"--min-np",
Expand All @@ -184,7 +258,7 @@ def _get_horovod_prefix(self) -> List[str]:
"--slots-per-host",
f"{self.task_config.slots}",
"--host-discovery-script",
self.discovery_script_path,
self.task_config.discovery_script_path,
]
return base_cmd

Expand Down
Loading

0 comments on commit 9696dc3

Please sign in to comment.