Skip to content

Commit

Permalink
Added mpi plugin (flyteorg#595)
Browse files Browse the repository at this point in the history
* Added mpi plugin

Signed-off-by: Yuvraj <[email protected]>

Co-authored-by: Ketan Umare <[email protected]>
Signed-off-by: Robert Everson <[email protected]>
  • Loading branch information
2 people authored and Robert Everson committed May 27, 2022
1 parent 9f5aa51 commit 8f6de3b
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
- flytekit-greatexpectations
- flytekit-hive
- flytekit-k8s-pod
- flytekit-kf-mpi
- flytekit-kf-pytorch
- flytekit-kf-tensorflow
- flytekit-spark
Expand Down
21 changes: 11 additions & 10 deletions plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ All Flytekit plugins maintained by the core team are added here. It is not neces

| Plugin | Installation | Description | Version | Type |
|------------------------------|------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in Python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured Hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s Distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed PyTorch Jobs in Python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s Native TensorFlow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed TensorFlow Jobs in Python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| Papermill-based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks; ensure communication between tasks and notebooks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only |
| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in Python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only |
| Spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend |
| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries to execute them on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend |
| Dolt | ```bash pip install flytekitplugins-dolt ``` | Read & write Dolt data sets and use Dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only |
| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only |
| AWS Sagemaker Training | ```bash pip install flytekitplugins-awssagemaker ``` | Installs SDK to author Sagemaker built-in and custom training jobs in python | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-awssagemaker/) | Backend |
| Hive Queries | ```bash pip install flytekitplugins-hive ``` | Installs SDK to author Hive Queries that can be executed on a configured hive backend using Flyte backend plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-hive/) | Backend |
| K8s distributed PyTorch Jobs | ```bash pip install flytekitplugins-kfpytorch ``` | Installs SDK to author Distributed pyTorch Jobs in python using Kubeflow PyTorch Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kfpytorch/) | Backend |
| K8s native tensorflow Jobs | ```bash pip install flytekitplugins-kftensorflow ``` | Installs SDK to author Distributed tensorflow Jobs in python using Kubeflow Tensorflow Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| K8s native MPI Jobs | ```bash pip install flytekitplugins-kfmpi ``` | Installs SDK to author Distributed MPI Jobs in python using Kubeflow MPI Operator | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-kftensorflow/) | Backend |
| Papermill based Tasks | ```bash pip install flytekitplugins-papermill ``` | Execute entire notebooks as Flyte Tasks and pass inputs and outputs between them and python tasks | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-papermill/) | Flytekit-only |
| Pod Tasks | ```bash pip install flytekitplugins-pod ``` | Installs SDK to author Pods in python. These pods can have multiple containers, use volumes and have non exiting side-cars | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pod/) | Flytekit-only |
| spark | ```bash pip install flytekitplugins-spark ``` | Installs SDK to author Spark jobs that can be executed natively on Kubernetes with a supported backend Flyte plugin | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-spark/) | Backend |
| AWS Athena Queries | ```bash pip install flytekitplugins-athena ``` | Installs SDK to author queries executed on AWS Athena | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-athena/) | Backend |
| DOLT | ```bash pip install flytekitplugins-dolt ``` | Read & write dolt data sets and use dolt tables as native types | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-dolt/) | Flytekit-only |
| Pandera | ```bash pip install flytekitplugins-pandera ``` | Use Pandera schemas as native Flyte types, which enable data quality checks. | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-pandera/) | Flytekit-only |
| SQLAlchemy | ```bash pip install flytekitplugins-sqlalchemy ``` | Write queries for any database that supports SQLAlchemy | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-spark.svg)](https://pypi.python.org/pypi/flytekitplugins-sqlalchemy/) | Flytekit-only |
| Great Expectations | ```bash pip install flytekitplugins-great-expectations``` | Enforce data quality for various data types within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Flytekit-only |
| Snowflake | ```bash pip install flytekitplugins-snowflake``` | Use Snowflake as a 'data warehouse-as-a-service' within Flyte | [![PyPI version fury.io](https://badge.fury.io/py/flytekitplugins-great-expectations.svg)](https://pypi.python.org/pypi/flytekitplugins-great-expectations/) | Backend |
Expand Down
11 changes: 11 additions & 0 deletions plugins/flytekit-kf-mpi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Flytekit Kubeflow MPI Plugin

This plugin uses the Kubeflow MPI Operator and provides an extremely simplified interface for executing distributed training.

To install the plugin, run the following command:

```bash
pip install flytekitplugins-kfmpi
```

_Example coming soon!_
1 change: 1 addition & 0 deletions plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .task import MPIJob
136 changes: 136 additions & 0 deletions plugins/flytekit-kf-mpi/flytekitplugins/kfmpi/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
This Plugin adds the capability of running distributed MPI training to Flyte using backend plugins, natively on
Kubernetes. It leverages `MPI Job <https://github.com/kubeflow/mpi-operator>`_ Plugin from kubeflow.
"""
from dataclasses import dataclass
from typing import Any, Callable, Dict, List

from flyteidl.plugins import mpi_pb2 as _mpi_task
from google.protobuf.json_format import MessageToDict

from flytekit import PythonFunctionTask
from flytekit.extend import SerializationSettings, TaskPlugins
from flytekit.models import common as _common


class MPIJobModel(_common.FlyteIdlEntity):
"""Model definition for MPI the plugin
Args:
num_workers: integer determining the number of worker replicas spawned in the cluster for this job
(in addition to 1 master).
num_launcher_replicas: Number of launcher server replicas to use
slots: Number of slots per worker used in hostfile
.. note::
Please use resources=Resources(cpu="1"...) to specify per worker resource
"""

def __init__(self, num_workers, num_launcher_replicas, slots):
self._num_workers = num_workers
self._num_launcher_replicas = num_launcher_replicas
self._slots = slots

@property
def num_workers(self):
return self._num_workers

@property
def num_launcher_replicas(self):
return self._num_launcher_replicas

@property
def slots(self):
return self._slots

def to_flyte_idl(self):
return _mpi_task.DistributedMPITrainingTask(
num_workers=self.num_workers, num_launcher_replicas=self.num_launcher_replicas, slots=self.slots
)

@classmethod
def from_flyte_idl(cls, pb2_object):
return cls(
num_workers=pb2_object.num_workers,
num_launcher_replicas=pb2_object.num_launcher_replicas,
slots=pb2_object.slots,
)


@dataclass
class MPIJob(object):
"""
Configuration for an executable `MPI Job <https://github.com/kubeflow/mpi-operator>`_. Use this
to run distributed training on k8s with MPI
Args:
num_workers: integer determining the number of worker replicas spawned in the cluster for this job
(in addition to 1 master).
num_launcher_replicas: Number of launcher server replicas to use
slots: Number of slots per worker used in hostfile
"""

slots: int
num_launcher_replicas: int = 1
num_workers: int = 1


class MPIFunctionTask(PythonFunctionTask[MPIJob]):
"""
Plugin that submits a MPIJob (see https://github.com/kubeflow/mpi-operator)
defined by the code within the _task_function to k8s cluster.
"""

_MPI_JOB_TASK_TYPE = "mpi"
_MPI_BASE_COMMAND = [
"mpirun",
"--allow-run-as-root",
"-bind-to",
"none",
"-map-by",
"slot",
"-x",
"LD_LIBRARY_PATH",
"-x",
"PATH",
"-x",
"NCCL_DEBUG=INFO",
"-mca",
"pml",
"ob1",
"-mca",
"btl",
"^openib",
]

def __init__(self, task_config: MPIJob, task_function: Callable, **kwargs):
super().__init__(
task_config=task_config,
task_function=task_function,
task_type=self._MPI_JOB_TASK_TYPE,
**kwargs,
)

def get_command(self, settings: SerializationSettings) -> List[str]:
cmd = super().get_command(settings)
num_procs = self.task_config.num_workers * self.task_config.slots
mpi_cmd = self._MPI_BASE_COMMAND + ["-np", f"{num_procs}"] + ["python", settings.entrypoint_settings.path] + cmd
# the hostfile is set automatically by MPIOperator using env variable OMPI_MCA_orte_default_hostfile
return mpi_cmd

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
job = MPIJobModel(
num_workers=self.task_config.num_workers,
num_launcher_replicas=self.task_config.num_launcher_replicas,
slots=self.task_config.slots,
)
return MessageToDict(job.to_flyte_idl())


# Register the MPI Plugin into the flytekit core plugin system
TaskPlugins.register_pythontask_plugin(MPIJob, MPIFunctionTask)
2 changes: 2 additions & 0 deletions plugins/flytekit-kf-mpi/requirements.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.
-e file:.#egg=flytekitplugins-kfmpi
8 changes: 8 additions & 0 deletions plugins/flytekit-kf-mpi/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# This file is autogenerated by pip-compile with python 3.8
# To update, run:
#
# pip-compile requirements.in
#
-e file:.#egg=flytekitplugins-kfmpi
# via -r requirements.in
34 changes: 34 additions & 0 deletions plugins/flytekit-kf-mpi/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from setuptools import setup

PLUGIN_NAME = "kfmpi"

microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=0.16.0b0,<1.0.0", "flyteidl>=0.21.4"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="K8s based MPI plugin for flytekit",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.6",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
45 changes: 45 additions & 0 deletions plugins/flytekit-kf-mpi/tests/test_mpi_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from flytekitplugins.kfmpi.task import MPIJob, MPIJobModel

from flytekit import Resources, task
from flytekit.core.context_manager import EntrypointSettings
from flytekit.extend import Image, ImageConfig, SerializationSettings


def test_mpi_model_task():
job = MPIJobModel(
num_workers=1,
num_launcher_replicas=1,
slots=1,
)
assert job.num_workers == 1
assert job.num_launcher_replicas == 1
assert job.slots == 1
assert job.from_flyte_idl(job.to_flyte_idl())


def test_mpi_task():
@task(
task_config=MPIJob(num_workers=10, num_launcher_replicas=10, slots=1),
requests=Resources(cpu="1"),
cache=True,
cache_version="1",
)
def my_mpi_task(x: int, y: str) -> int:
return x

assert my_mpi_task(x=10, y="hello") == 10

assert my_mpi_task.task_config is not None

default_img = Image(name="default", fqn="test", tag="tag")
settings = SerializationSettings(
project="project",
domain="domain",
version="version",
env={"FOO": "baz"},
image_config=ImageConfig(default_image=default_img, images=[default_img]),
entrypoint_settings=EntrypointSettings(path="/etc/my-entrypoint", command="my-entrypoint"),
)

assert my_mpi_task.get_custom(settings) == {"numLauncherReplicas": 10, "numWorkers": 10, "slots": 1}
assert my_mpi_task.task_type == "mpi"
1 change: 1 addition & 0 deletions plugins/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"flytekitplugins-great_expectations": "flytekit-greatexpectations",
"flytekitplugins-hive": "flytekit-hive",
"flytekitplugins-pod": "flytekit-k8s-pod",
"flytekitplugins-kfmpi": "flytekit-kf-mpi",
"flytekitplugins-kfpytorch": "flytekit-kf-pytorch",
"flytekitplugins-kftensorflow": "flytekit-kf-tensorflow",
"flytekitplugins-pandera": "flytekit-pandera",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
]
},
install_requires=[
"flyteidl>=0.21.0,<0.22.0",
"flyteidl>=0.21.4",
"wheel>=0.30.0,<1.0.0",
"pandas>=1.0.0,<2.0.0",
"pyarrow>=2.0.0,<4.0.0",
Expand Down

0 comments on commit 8f6de3b

Please sign in to comment.