Skip to content

Commit

Permalink
Adds comet-ml plugin (flyteorg#2550)
Browse files Browse the repository at this point in the history
* Adds comet-ml plugin

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

* For local execution, do not set experiment_key if it is none

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

* Use correct comet-ml links

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

* Allow host to be adjustable

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

* Adds comet-ml plugin

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

* Use new comet-ml login name

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

* Require the project_name workspace and secrets

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>

---------

Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: mao3267 <chenvincent610@gmail.com>
thomasjpfan authored and mao3267 committed Jul 29, 2024
1 parent e358e11 commit c6cb0ea
Showing 6 changed files with 395 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
@@ -316,6 +316,7 @@ jobs:
- flytekit-aws-batch
- flytekit-aws-sagemaker
- flytekit-bigquery
- flytekit-comet-ml
- flytekit-dask
- flytekit-data-fsspec
- flytekit-dbt
26 changes: 26 additions & 0 deletions plugins/flytekit-comet-ml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Flytekit Comet Plugin

Comet’s machine learning platform integrates with your existing infrastructure and tools so you can manage, visualize, and optimize models—from training runs to production monitoring. This plugin integrates Flyte with Comet.ml by configuring links between the two platforms.

To install the plugin, run:

```bash
pip install flytekitplugins-comet-ml
```

Comet requires an API key to authenticate with their platform. In the above example, a secret is created using
[Flyte's Secrets manager](https://docs.flyte.org/en/latest/user_guide/productionizing/secrets.html).

To enable linking from the Flyte side panel to Comet.ml, add the following to Flyte's configuration:

```yaml
plugins:
logs:
dynamic-log-links:
- comet-ml-execution-id:
displayName: Comet
templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .executionName }}{{ .nodeId }}{{ .taskRetryAttempt }}{{ .taskConfig.link_suffix }}"
- comet-ml-custom-id:
displayName: Comet
templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .taskConfig.experiment_key }}"
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .tracking import comet_ml_login

__all__ = ["comet_ml_login"]
173 changes: 173 additions & 0 deletions plugins/flytekit-comet-ml/flytekitplugins/comet_ml/tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import os
from functools import partial
from hashlib import shake_256
from typing import Callable, Optional, Union

import comet_ml
from flytekit import Secret
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.utils import ClassDecorator

COMET_ML_EXECUTION_TYPE_VALUE = "comet-ml-execution-id"
COMET_ML_CUSTOM_TYPE_VALUE = "comet-ml-custom-id"


def _generate_suffix_with_length_10(project_name: str, workspace: str) -> str:
"""Generate suffix from project_name + workspace."""
h = shake_256(f"{project_name}-{workspace}".encode("utf-8"))
# Using 5 generates a suffix with length 10
return h.hexdigest(5)


def _generate_experiment_key(hostname: str, project_name: str, workspace: str) -> str:
"""Generate experiment key that comet_ml can use:
1. Is alphanumeric
2. 32 <= len(experiment_key) <= 50
"""
# In Flyte, then hostname is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}, where
# - len(executionName) == 20
# - 2 <= len(nodeId) <= 8
# - 1 <= len(taskRetryAttempt)) <= 2 (In practice, retries does not go above 99)
# Removing the `-` because it is not alphanumeric, the 23 <= len(hostname) <= 30
# On the low end we need to add 10 characters to stay in the range acceptable to comet_ml
hostname = hostname.replace("-", "")
suffix = _generate_suffix_with_length_10(project_name, workspace)
return f"{hostname}{suffix}"


def comet_ml_login(
project_name: str,
workspace: str,
secret: Union[Secret, Callable],
experiment_key: Optional[str] = None,
host: str = "https://www.comet.com",
**login_kwargs: dict,
):
"""Comet plugin.
Args:
project_name (str): Send your experiment to a specific project. (Required)
workspace (str): Attach an experiment to a project that belongs to this workspace. (Required)
secret (Secret or Callable): Secret with your `COMET_API_KEY` or a callable that returns the API key.
The callable takes no arguments and returns a string. (Required)
experiment_key (str): Experiment key.
host (str): URL to your Comet service. Defaults to "https://www.comet.com"
**login_kwargs (dict): The rest of the arguments are passed directly to `comet_ml.login`.
"""
return partial(
_comet_ml_login_class,
project_name=project_name,
workspace=workspace,
secret=secret,
experiment_key=experiment_key,
host=host,
**login_kwargs,
)


class _comet_ml_login_class(ClassDecorator):
COMET_ML_PROJECT_NAME_KEY = "project_name"
COMET_ML_WORKSPACE_KEY = "workspace"
COMET_ML_EXPERIMENT_KEY_KEY = "experiment_key"
COMET_ML_URL_SUFFIX_KEY = "link_suffix"
COMET_ML_HOST_KEY = "host"

def __init__(
self,
task_function: Callable,
project_name: str,
workspace: str,
secret: Union[Secret, Callable],
experiment_key: Optional[str] = None,
host: str = "https://www.comet.com",
**login_kwargs: dict,
):
"""Comet plugin.
Args:
project_name (str): Send your experiment to a specific project. (Required)
workspace (str): Attach an experiment to a project that belongs to this workspace. (Required)
secret (Secret or Callable): Secret with your `COMET_API_KEY` or a callable that returns the API key.
The callable takes no arguments and returns a string. (Required)
experiment_key (str): Experiment key.
host (str): URL to your Comet service. Defaults to "https://www.comet.com"
**login_kwargs (dict): The rest of the arguments are passed directly to `comet_ml.login`.
"""

self.project_name = project_name
self.workspace = workspace
self.experiment_key = experiment_key
self.secret = secret
self.host = host
self.login_kwargs = login_kwargs

super().__init__(
task_function,
project_name=project_name,
workspace=workspace,
experiment_key=experiment_key,
secret=secret,
host=host,
**login_kwargs,
)

def execute(self, *args, **kwargs):
ctx = FlyteContextManager.current_context()
is_local_execution = ctx.execution_state.is_local_execution()

default_kwargs = self.login_kwargs
login_kwargs = {
"project_name": self.project_name,
"workspace": self.workspace,
**default_kwargs,
}

if is_local_execution:
# For local execution, always use the experiment_key. If `self.experiment_key` is `None`, comet_ml
# will generate it's own key
if self.experiment_key is not None:
login_kwargs["experiment_key"] = self.experiment_key
else:
# Get api key for remote execution
if isinstance(self.secret, Secret):
secrets = ctx.user_space_params.secrets
comet_ml_api_key = secrets.get(key=self.secret.key, group=self.secret.group)
else:
comet_ml_api_key = self.secret()

login_kwargs["api_key"] = comet_ml_api_key

if self.experiment_key is None:
# The HOSTNAME is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}
# If HOSTNAME is not defined, use the execution name as a fallback
hostname = os.environ.get("HOSTNAME", ctx.user_space_params.execution_id.name)
experiment_key = _generate_experiment_key(hostname, self.project_name, self.workspace)
else:
experiment_key = self.experiment_key

login_kwargs["experiment_key"] = experiment_key

if hasattr(comet_ml, "login"):
comet_ml.login(**login_kwargs)
else:
comet_ml.init(**login_kwargs)

output = self.task_function(*args, **kwargs)
return output

def get_extra_config(self):
extra_config = {
self.COMET_ML_PROJECT_NAME_KEY: self.project_name,
self.COMET_ML_WORKSPACE_KEY: self.workspace,
self.COMET_ML_HOST_KEY: self.host,
}

if self.experiment_key is None:
comet_ml_value = COMET_ML_EXECUTION_TYPE_VALUE
suffix = _generate_suffix_with_length_10(self.project_name, self.workspace)
extra_config[self.COMET_ML_URL_SUFFIX_KEY] = suffix
else:
comet_ml_value = COMET_ML_CUSTOM_TYPE_VALUE
extra_config[self.COMET_ML_EXPERIMENT_KEY_KEY] = self.experiment_key

extra_config[self.LINK_TYPE_KEY] = comet_ml_value
return extra_config
39 changes: 39 additions & 0 deletions plugins/flytekit-comet-ml/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from setuptools import setup

PLUGIN_NAME = "comet-ml"
MODULE_NAME = "comet_ml"


microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=1.12.3", "comet-ml>=3.43.2"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="admin@flyte.org",
description="This package enables seamless use of Comet within Flyte",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{MODULE_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.8",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
153 changes: 153 additions & 0 deletions plugins/flytekit-comet-ml/tests/test_comet_ml_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from hashlib import shake_256
from unittest.mock import patch, Mock
import pytest

from flytekit import Secret, task
from flytekitplugins.comet_ml import comet_ml_login
from flytekitplugins.comet_ml.tracking import (
COMET_ML_CUSTOM_TYPE_VALUE,
COMET_ML_EXECUTION_TYPE_VALUE,
_generate_suffix_with_length_10,
_generate_experiment_key,
)


secret = Secret(key="abc", group="xyz")


@pytest.mark.parametrize("experiment_key", [None, "abc123dfassfasfsafsafd"])
def test_extra_config(experiment_key):
project_name = "abc"
workspace = "my_workspace"

comet_decorator = comet_ml_login(
project_name=project_name,
workspace=workspace,
experiment_key=experiment_key,
secret=secret
)

@comet_decorator
def task():
pass

assert task.secret is secret
extra_config = task.get_extra_config()

if experiment_key is None:
assert extra_config[task.LINK_TYPE_KEY] == COMET_ML_EXECUTION_TYPE_VALUE
assert task.COMET_ML_EXPERIMENT_KEY_KEY not in extra_config

suffix = _generate_suffix_with_length_10(project_name=project_name, workspace=workspace)
assert extra_config[task.COMET_ML_URL_SUFFIX_KEY] == suffix

else:
assert extra_config[task.LINK_TYPE_KEY] == COMET_ML_CUSTOM_TYPE_VALUE
assert extra_config[task.COMET_ML_EXPERIMENT_KEY_KEY] == experiment_key
assert task.COMET_ML_URL_SUFFIX_KEY not in extra_config

assert extra_config[task.COMET_ML_WORKSPACE_KEY] == workspace
assert extra_config[task.COMET_ML_HOST_KEY] == "https://www.comet.com"


@task
@comet_ml_login(project_name="abc", workspace="my-workspace", secret=secret, log_code=False)
def train_model():
pass


@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_local_execution(comet_ml_mock):
train_model()

comet_ml_mock.login.assert_called_with(
project_name="abc", workspace="my-workspace", log_code=False)


@task
@comet_ml_login(
project_name="xyz",
workspace="another-workspace",
secret=secret,
experiment_key="my-previous-experiment-key",
)
def train_model_with_experiment_key():
pass


@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_local_execution_with_experiment_key(comet_ml_mock):
train_model_with_experiment_key()

comet_ml_mock.login.assert_called_with(
project_name="xyz",
workspace="another-workspace",
experiment_key="my-previous-experiment-key",
)


@patch("flytekitplugins.comet_ml.tracking.os")
@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager")
@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_remote_execution(comet_ml_mock, manager_mock, os_mock):
# Pretend that the execution is remote
ctx_mock = Mock()
ctx_mock.execution_state.is_local_execution.return_value = False

ctx_mock.user_space_params.secrets.get.return_value = "this_is_the_secret"
ctx_mock.user_space_params.execution_id.name = "my_execution_id"

manager_mock.current_context.return_value = ctx_mock
hostname = "a423423423afasf4jigl-fasj4321-0"
os_mock.environ = {"HOSTNAME": hostname}

project_name = "abc"
workspace = "my-workspace"

h = shake_256(f"{project_name}-{workspace}".encode("utf-8"))
suffix = h.hexdigest(5)
hostname_alpha = hostname.replace("-", "")
experiment_key = f"{hostname_alpha}{suffix}"

train_model()

comet_ml_mock.login.assert_called_with(
project_name="abc",
workspace="my-workspace",
api_key="this_is_the_secret",
experiment_key=experiment_key,
log_code=False,
)
ctx_mock.user_space_params.secrets.get.assert_called_with(key="abc", group="xyz")


def get_secret():
return "my-comet-ml-api-key"


@task
@comet_ml_login(project_name="my_project", workspace="my_workspace", secret=get_secret)
def train_model_with_callable_secret():
pass


@patch("flytekitplugins.comet_ml.tracking.os")
@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager")
@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_remote_execution_with_callable_secret(comet_ml_mock, manager_mock, os_mock):
# Pretend that the execution is remote
ctx_mock = Mock()
ctx_mock.execution_state.is_local_execution.return_value = False

manager_mock.current_context.return_value = ctx_mock
hostname = "a423423423afasf4jigl-fasj4321-0"
os_mock.environ = {"HOSTNAME": hostname}

train_model_with_callable_secret()

comet_ml_mock.login.assert_called_with(
project_name="my_project",
api_key="my-comet-ml-api-key",
workspace="my_workspace",
experiment_key=_generate_experiment_key(hostname, "my_project", "my_workspace")
)

0 comments on commit c6cb0ea

Please sign in to comment.