Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds comet-ml plugin #2550

Merged
merged 8 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions plugins/flytekit-comet-ml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Flytekit Comet Plugin

Comet’s machine learning platform integrates with your existing infrastructure and tools so you can manage, visualize, and optimize models—from training runs to production monitoring. This plugin integrates Flyte with Comet.ml by configuring links between the two platforms.

To install the plugin, run:

```bash
pip install flytekitplugins-comet-ml
```

Comet requires an API key to authenticate with their platform. In the above example, a secret is created using
[Flyte's Secrets manager](https://docs.flyte.org/en/latest/user_guide/productionizing/secrets.html).

To enable linking from the Flyte side panel to Comet.ml, add the following to Flyte's configuration:

```yaml
plugins:
logs:
dynamic-log-links:
- comet-ml-execution-id:
displayName: Comet
templateUris: {{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .executionName }}{{ .nodeId }}{{ .taskRetryAttempt }}{{ .taskConfig.link_suffix }}
Copy link
Member

@Future-Outlier Future-Outlier Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use double quote here, right?

- comet-ml-custom-id:
displayName: Comet
templateUris: {{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .taskConfig.experiment_key }}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing

```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .tracking import comet_ml_login

__all__ = ["comet_ml_login"]
150 changes: 150 additions & 0 deletions plugins/flytekit-comet-ml/flytekitplugins/comet_ml/tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import os
from hashlib import shake_256
from typing import Callable, Optional, Union

import comet_ml
from flytekit import Secret
from flytekit.core.context_manager import FlyteContextManager
from flytekit.core.utils import ClassDecorator

COMET_ML_EXECUTION_TYPE_VALUE = "comet-ml-execution-id"
COMET_ML_CUSTOM_TYPE_VALUE = "comet-ml-custom-id"


def _generate_suffix_with_length_10(project_name: str, workspace: str) -> str:
"""Generate suffix from project_name + workspace."""
h = shake_256(f"{project_name}-{workspace}".encode("utf-8"))
# Using 5 generates a suffix with length 10
return h.hexdigest(5)


def _generate_experiment_key(hostname: str, project_name: str, workspace: str) -> str:
"""Generate experiment key that comet_ml can use:

1. Is alphanumeric
2. 32 <= len(experiment_key) <= 50
"""
# In Flyte, then hostname is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}, where
# - len(executionName) == 20
# - 2 <= len(nodeId) <= 8
# - 1 <= len(taskRetryAttempt)) <= 2 (In practice, retries does not go above 99)
# Removing the `-` because it is not alphanumeric, the 23 <= len(hostname) <= 30
# On the low end we need to add 10 characters to stay in the range acceptable to comet_ml
hostname = hostname.replace("-", "")
suffix = _generate_suffix_with_length_10(project_name, workspace)
return f"{hostname}{suffix}"


class comet_ml_login(ClassDecorator):
COMET_ML_PROJECT_NAME_KEY = "project_name"
COMET_ML_WORKSPACE_KEY = "workspace"
COMET_ML_EXPERIMENT_KEY_KEY = "experiment_key"
COMET_ML_URL_SUFFIX_KEY = "link_suffix"
COMET_ML_HOST_KEY = "host"

def __init__(
self,
task_function: Optional[Callable] = None,
project_name: Optional[str] = None,
workspace: Optional[str] = None,
experiment_key: Optional[str] = None,
secret: Optional[Union[Secret, Callable]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can project_name, workspace, and secret be None Optional here in the type hints?
It will be better for the type hints

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already Optional here. Are you suggesting to not make them Optional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it to work with 417d03b It forces project_name and friends to be required.

host: str = "https://www.comet.com",
**login_kwargs: dict,
):
"""Comet plugin.
Args:
task_function (function, optional): The user function to be decorated. Defaults to None.
project_name (str): Send your experiment to a specific project. (Required)
workspace (str): Attach an experiment to a project that belongs to this workspace. (Required)
experiment_key (str): Experiment key.
secret (Secret or Callable): Secret with your `COMET_API_KEY` or a callable that returns the API key.
The callable takes no arguments and returns a string. (Required)
host (str): URL to your Comet service. Defaults to "https://www.comet.com"
**login_kwargs (dict): The rest of the arguments are passed directly to `comet_ml.login`.
"""
if project_name is None:
raise ValueError("project_name must be set")
if workspace is None:
raise ValueError("workspace must be set")
if secret is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't secret be made optional as it isn't necessary when executing locally?

Copy link
Member Author

@thomasjpfan thomasjpfan Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The secret field is ignored during location execution, but required during remote. One still needs export COMET_API_KEY= for local execution.

If this was optional, we'll catch the "no secret" during execution and not at registration time. I do not want to wait until then, because these experiments can use GPUs and waiting & spinning up a GPU just to see this error is not efficient and a worst DevX.

By making secret required, I am optimizing for "failing quickly" for remote execution.

raise ValueError("secret must be set")

self.project_name = project_name
self.workspace = workspace
self.experiment_key = experiment_key
self.secret = secret
self.host = host
self.login_kwargs = login_kwargs

super().__init__(
task_function,
project_name=project_name,
workspace=workspace,
experiment_key=experiment_key,
secret=secret,
host=host,
**login_kwargs,
)

def execute(self, *args, **kwargs):
ctx = FlyteContextManager.current_context()
is_local_execution = ctx.execution_state.is_local_execution()

default_kwargs = self.login_kwargs
login_kwargs = {
"project_name": self.project_name,
"workspace": self.workspace,
**default_kwargs,
}

if is_local_execution:
# For local execution, always use the experiment_key. If `self.experiment_key` is `None`, comet_ml
# will generate it's own key
if self.experiment_key is not None:
login_kwargs["experiment_key"] = self.experiment_key
else:
# Get api key for remote execution
if isinstance(self.secret, Secret):
secrets = ctx.user_space_params.secrets
comet_ml_api_key = secrets.get(key=self.secret.key, group=self.secret.group)
else:
comet_ml_api_key = self.secret()

login_kwargs["api_key"] = comet_ml_api_key

if self.experiment_key is None:
# The HOSTNAME is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}
# If HOSTNAME is not defined, use the execution name as a fallback
hostname = os.environ.get("HOSTNAME", ctx.user_space_params.execution_id.name)
experiment_key = _generate_experiment_key(hostname, self.project_name, self.workspace)
else:
experiment_key = self.experiment_key

login_kwargs["experiment_key"] = experiment_key

if hasattr(comet_ml, "login"):
comet_ml.login(**login_kwargs)
else:
comet_ml.init(**login_kwargs)

output = self.task_function(*args, **kwargs)
return output

def get_extra_config(self):
extra_config = {
self.COMET_ML_PROJECT_NAME_KEY: self.project_name,
self.COMET_ML_WORKSPACE_KEY: self.workspace,
self.COMET_ML_HOST_KEY: self.host,
}

if self.experiment_key is None:
comet_ml_value = COMET_ML_EXECUTION_TYPE_VALUE
suffix = _generate_suffix_with_length_10(self.project_name, self.workspace)
extra_config[self.COMET_ML_URL_SUFFIX_KEY] = suffix
else:
comet_ml_value = COMET_ML_CUSTOM_TYPE_VALUE
extra_config[self.COMET_ML_EXPERIMENT_KEY_KEY] = self.experiment_key

extra_config[self.LINK_TYPE_KEY] = comet_ml_value
return extra_config
39 changes: 39 additions & 0 deletions plugins/flytekit-comet-ml/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from setuptools import setup

PLUGIN_NAME = "comet-ml"
MODULE_NAME = "comet_ml"


microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=1.12.3", "comet-ml>=3.43.2"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package enables seamless use of Comet within Flyte",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{MODULE_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.8",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
160 changes: 160 additions & 0 deletions plugins/flytekit-comet-ml/tests/test_comet_ml_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from hashlib import shake_256
from unittest.mock import patch, Mock
import pytest

from flytekit import Secret, task
from flytekitplugins.comet_ml import comet_ml_login
from flytekitplugins.comet_ml.tracking import (
COMET_ML_CUSTOM_TYPE_VALUE,
COMET_ML_EXECUTION_TYPE_VALUE,
_generate_suffix_with_length_10,
_generate_experiment_key,
)


secret = Secret(key="abc", group="xyz")


@pytest.mark.parametrize("experiment_key", [None, "abc123dfassfasfsafsafd"])
def test_extra_config(experiment_key):
project_name = "abc"
workspace = "my_workspace"

comet_decorator = comet_ml_login(
project_name=project_name,
workspace=workspace,
experiment_key=experiment_key,
secret=secret
)

assert comet_decorator.secret is secret
extra_config = comet_decorator.get_extra_config()

if experiment_key is None:
assert extra_config[comet_decorator.LINK_TYPE_KEY] == COMET_ML_EXECUTION_TYPE_VALUE
assert comet_decorator.COMET_ML_EXPERIMENT_KEY_KEY not in extra_config

suffix = _generate_suffix_with_length_10(project_name=project_name, workspace=workspace)
assert extra_config[comet_decorator.COMET_ML_URL_SUFFIX_KEY] == suffix

else:
assert extra_config[comet_decorator.LINK_TYPE_KEY] == COMET_ML_CUSTOM_TYPE_VALUE
assert extra_config[comet_decorator.COMET_ML_EXPERIMENT_KEY_KEY] == experiment_key
assert comet_decorator.COMET_ML_URL_SUFFIX_KEY not in extra_config

assert extra_config[comet_decorator.COMET_ML_WORKSPACE_KEY] == workspace
assert extra_config[comet_decorator.COMET_ML_HOST_KEY] == "https://www.comet.com"


@task
@comet_ml_login(project_name="abc", workspace="my-workspace", secret=secret, log_code=False)
def train_model():
pass


@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_local_execution(comet_ml_mock):
train_model()

comet_ml_mock.init.assert_called_with(
project_name="abc", workspace="my-workspace", log_code=False)


@task
@comet_ml_login(
project_name="xyz",
workspace="another-workspace",
secret=secret,
experiment_key="my-previous-experiment-key",
)
def train_model_with_experiment_key():
pass


@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_local_execution_with_experiment_key(comet_ml_mock):
train_model_with_experiment_key()

comet_ml_mock.init.assert_called_with(
project_name="xyz",
workspace="another-workspace",
experiment_key="my-previous-experiment-key",
)


@patch("flytekitplugins.comet_ml.tracking.os")
@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager")
@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_remote_execution(comet_ml_mock, manager_mock, os_mock):
# Pretend that the execution is remote
ctx_mock = Mock()
ctx_mock.execution_state.is_local_execution.return_value = False

ctx_mock.user_space_params.secrets.get.return_value = "this_is_the_secret"
ctx_mock.user_space_params.execution_id.name = "my_execution_id"

manager_mock.current_context.return_value = ctx_mock
hostname = "a423423423afasf4jigl-fasj4321-0"
os_mock.environ = {"HOSTNAME": hostname}

project_name = "abc"
workspace = "my-workspace"

h = shake_256(f"{project_name}-{workspace}".encode("utf-8"))
suffix = h.hexdigest(5)
hostname_alpha = hostname.replace("-", "")
experiment_key = f"{hostname_alpha}{suffix}"

train_model()

comet_ml_mock.init.assert_called_with(
project_name="abc",
workspace="my-workspace",
api_key="this_is_the_secret",
experiment_key=experiment_key,
log_code=False,
)
ctx_mock.user_space_params.secrets.get.assert_called_with(key="abc", group="xyz")


def get_secret():
return "my-comet-ml-api-key"


@task
@comet_ml_login(project_name="my_project", workspace="my_workspace", secret=get_secret)
def train_model_with_callable_secret():
pass


@patch("flytekitplugins.comet_ml.tracking.os")
@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager")
@patch("flytekitplugins.comet_ml.tracking.comet_ml")
def test_remote_execution_with_callable_secret(comet_ml_mock, manager_mock, os_mock):
# Pretend that the execution is remote
ctx_mock = Mock()
ctx_mock.execution_state.is_local_execution.return_value = False

manager_mock.current_context.return_value = ctx_mock
hostname = "a423423423afasf4jigl-fasj4321-0"
os_mock.environ = {"HOSTNAME": hostname}

train_model_with_callable_secret()

comet_ml_mock.init.assert_called_with(
project_name="my_project",
api_key="my-comet-ml-api-key",
workspace="my_workspace",
experiment_key=_generate_experiment_key(hostname, "my_project", "my_workspace")
)


def test_errors():
with pytest.raises(ValueError, match="project_name must be set"):
comet_ml_login()

with pytest.raises(ValueError, match="workspace must be set"):
comet_ml_login(project_name="abc")

with pytest.raises(ValueError, match="secret must be set"):
comet_ml_login(project_name="abc", workspace="xyz")
Loading