-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds comet-ml plugin #2550
Adds comet-ml plugin #2550
Changes from 5 commits
ad70d64
da94ee2
117f050
4cfa4eb
58b33b0
d04d012
ead937d
cc2e9ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Flytekit Comet Plugin | ||
|
||
Comet’s machine learning platform integrates with your existing infrastructure and tools so you can manage, visualize, and optimize models—from training runs to production monitoring. This plugin integrates Flyte with Comet.ml by configuring links between the two platforms. | ||
|
||
To install the plugin, run: | ||
|
||
```bash | ||
pip install flytekitplugins-comet-ml | ||
``` | ||
|
||
Comet requires an API key to authenticate with their platform. In the above example, a secret is created using | ||
[Flyte's Secrets manager](https://docs.flyte.org/en/latest/user_guide/productionizing/secrets.html). | ||
|
||
To enable linking from the Flyte side panel to Comet.ml, add the following to Flyte's configuration: | ||
|
||
```yaml | ||
plugins: | ||
logs: | ||
dynamic-log-links: | ||
- comet-ml-execution-id: | ||
displayName: Comet | ||
templateUris: {{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .executionName }}{{ .nodeId }}{{ .taskRetryAttempt }}{{ .taskConfig.link_suffix }} | ||
- comet-ml-custom-id: | ||
displayName: Comet | ||
templateUris: {{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .taskConfig.experiment_key }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same thing |
||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .tracking import comet_ml_init | ||
|
||
__all__ = ["comet_ml_init"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import os | ||
from hashlib import shake_256 | ||
from typing import Callable, Optional, Union | ||
|
||
import comet_ml | ||
from flytekit import Secret | ||
from flytekit.core.context_manager import FlyteContextManager | ||
from flytekit.core.utils import ClassDecorator | ||
|
||
COMET_ML_EXECUTION_TYPE_VALUE = "comet-ml-execution-id" | ||
COMET_ML_CUSTOM_TYPE_VALUE = "comet-ml-custom-id" | ||
|
||
|
||
def _generate_suffix_with_length_10(project_name: str, workspace: str) -> str: | ||
"""Generate suffix from project_name + workspace.""" | ||
h = shake_256(f"{project_name}-{workspace}".encode("utf-8")) | ||
# Using 5 generates a suffix with length 10 | ||
return h.hexdigest(5) | ||
|
||
|
||
def _generate_experiment_key(hostname: str, project_name: str, workspace: str) -> str: | ||
"""Generate experiment key that comet_ml can use: | ||
|
||
1. Is alphanumeric | ||
2. 32 <= len(experiment_key) <= 50 | ||
""" | ||
# In Flyte, then hostname is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}, where | ||
# - len(executionName) == 20 | ||
# - 2 <= len(nodeId) <= 8 | ||
# - 1 <= len(taskRetryAttempt)) <= 2 (In practice, retries does not go above 99) | ||
# Removing the `-` because it is not alphanumeric, the 23 <= len(hostname) <= 30 | ||
# On the low end we need to add 10 characters to stay in the range acceptable to comet_ml | ||
hostname = hostname.replace("-", "") | ||
suffix = _generate_suffix_with_length_10(project_name, workspace) | ||
return f"{hostname}{suffix}" | ||
|
||
|
||
class comet_ml_init(ClassDecorator): | ||
COMET_ML_PROJECT_NAME_KEY = "project_name" | ||
COMET_ML_WORKSPACE_KEY = "workspace" | ||
COMET_ML_EXPERIMENT_KEY_KEY = "experiment_key" | ||
COMET_ML_URL_SUFFIX_KEY = "link_suffix" | ||
COMET_ML_HOST_KEY = "host" | ||
|
||
def __init__( | ||
self, | ||
task_function: Optional[Callable] = None, | ||
project_name: Optional[str] = None, | ||
workspace: Optional[str] = None, | ||
experiment_key: Optional[str] = None, | ||
secret: Optional[Union[Secret, Callable]] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can project_name, workspace, and secret be None There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are already There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got it to work with 417d03b It forces |
||
host: str = "https://www.comet.com", | ||
**init_kwargs: dict, | ||
): | ||
"""Comet plugin. | ||
Args: | ||
task_function (function, optional): The user function to be decorated. Defaults to None. | ||
project_name (str): Send your experiment to a specific project. (Required) | ||
workspace (str): Attach an experiment to a project that belongs to this workspace. (Required) | ||
experiment_key (str): Experiment key. | ||
secret (Secret or Callable): Secret with your `COMET_API_KEY` or a callable that returns the API key. | ||
The callable takes no arguments and returns a string. (Required) | ||
host (str): URL to your Comet service. Defaults to "https://www.comet.com" | ||
**init_kwargs (dict): The rest of the arguments are passed directly to `comet_ml.init`. | ||
""" | ||
if project_name is None: | ||
raise ValueError("project_name must be set") | ||
if workspace is None: | ||
raise ValueError("workspace must be set") | ||
if secret is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The secret field is ignored during location execution, but required during remote. One still needs If this was optional, we'll catch the "no secret" during execution and not at registration time. I do not want to wait until then, because these experiments can use GPUs and waiting & spinning up a GPU just to see this error is not efficient and a worst DevX. By making |
||
raise ValueError("secret must be set") | ||
|
||
self.project_name = project_name | ||
self.workspace = workspace | ||
self.experiment_key = experiment_key | ||
self.secret = secret | ||
self.host = host | ||
self.init_kwargs = init_kwargs | ||
|
||
super().__init__( | ||
task_function, | ||
project_name=project_name, | ||
workspace=workspace, | ||
experiment_key=experiment_key, | ||
secret=secret, | ||
host=host, | ||
**init_kwargs, | ||
) | ||
|
||
def execute(self, *args, **kwargs): | ||
ctx = FlyteContextManager.current_context() | ||
is_local_execution = ctx.execution_state.is_local_execution() | ||
|
||
default_kwargs = self.init_kwargs | ||
init_kwargs = { | ||
"project_name": self.project_name, | ||
"workspace": self.workspace, | ||
**default_kwargs, | ||
} | ||
|
||
if is_local_execution: | ||
# For local execution, always use the experiment_key. If `self.experiment_key` is `None`, comet_ml | ||
# will generate it's own key | ||
if self.experiment_key is not None: | ||
init_kwargs["experiment_key"] = self.experiment_key | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is api key not required for local executions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's required, I use |
||
else: | ||
# Get api key for remote execution | ||
if isinstance(self.secret, Secret): | ||
secrets = ctx.user_space_params.secrets | ||
comet_ml_api_key = secrets.get(key=self.secret.key, group=self.secret.group) | ||
else: | ||
comet_ml_api_key = self.secret() | ||
|
||
init_kwargs["api_key"] = comet_ml_api_key | ||
|
||
if self.experiment_key is None: | ||
# The HOSTNAME is set to {.executionName}-{.nodeID}-{.taskRetryAttempt} | ||
# If HOSTNAME is not defined, use the execution name as a fallback | ||
hostname = os.environ.get("HOSTNAME", ctx.user_space_params.execution_id.name) | ||
experiment_key = _generate_experiment_key(hostname, self.project_name, self.workspace) | ||
else: | ||
experiment_key = self.experiment_key | ||
|
||
init_kwargs["experiment_key"] = experiment_key | ||
|
||
comet_ml.init(**init_kwargs) | ||
output = self.task_function(*args, **kwargs) | ||
return output | ||
|
||
def get_extra_config(self): | ||
extra_config = { | ||
self.COMET_ML_PROJECT_NAME_KEY: self.project_name, | ||
self.COMET_ML_WORKSPACE_KEY: self.workspace, | ||
self.COMET_ML_HOST_KEY: self.host, | ||
} | ||
|
||
if self.experiment_key is None: | ||
comet_ml_value = COMET_ML_EXECUTION_TYPE_VALUE | ||
suffix = _generate_suffix_with_length_10(self.project_name, self.workspace) | ||
extra_config[self.COMET_ML_URL_SUFFIX_KEY] = suffix | ||
else: | ||
comet_ml_value = COMET_ML_CUSTOM_TYPE_VALUE | ||
extra_config[self.COMET_ML_EXPERIMENT_KEY_KEY] = self.experiment_key | ||
|
||
extra_config[self.LINK_TYPE_KEY] = comet_ml_value | ||
return extra_config |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from setuptools import setup | ||
|
||
PLUGIN_NAME = "comet-ml" | ||
MODULE_NAME = "comet_ml" | ||
|
||
|
||
microlib_name = f"flytekitplugins-{PLUGIN_NAME}" | ||
|
||
plugin_requires = ["flytekit>=1.12.3", "comet-ml>=3.43.2"] | ||
|
||
__version__ = "0.0.0+develop" | ||
|
||
setup( | ||
name=microlib_name, | ||
version=__version__, | ||
author="flyteorg", | ||
author_email="[email protected]", | ||
description="This package enables seamless use of Comet within Flyte", | ||
namespace_packages=["flytekitplugins"], | ||
packages=[f"flytekitplugins.{MODULE_NAME}"], | ||
install_requires=plugin_requires, | ||
license="apache2", | ||
python_requires=">=3.8", | ||
classifiers=[ | ||
"Intended Audience :: Science/Research", | ||
"Intended Audience :: Developers", | ||
"License :: OSI Approved :: Apache Software License", | ||
"Programming Language :: Python :: 3.8", | ||
"Programming Language :: Python :: 3.9", | ||
"Programming Language :: Python :: 3.10", | ||
"Programming Language :: Python :: 3.11", | ||
"Programming Language :: Python :: 3.12", | ||
"Topic :: Scientific/Engineering", | ||
"Topic :: Scientific/Engineering :: Artificial Intelligence", | ||
"Topic :: Software Development", | ||
"Topic :: Software Development :: Libraries", | ||
"Topic :: Software Development :: Libraries :: Python Modules", | ||
], | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
from hashlib import shake_256 | ||
from unittest.mock import patch, Mock | ||
import pytest | ||
|
||
from flytekit import Secret, task | ||
from flytekitplugins.comet_ml import comet_ml_init | ||
from flytekitplugins.comet_ml.tracking import ( | ||
COMET_ML_CUSTOM_TYPE_VALUE, | ||
COMET_ML_EXECUTION_TYPE_VALUE, | ||
_generate_suffix_with_length_10, | ||
_generate_experiment_key, | ||
) | ||
|
||
|
||
secret = Secret(key="abc", group="xyz") | ||
|
||
|
||
@pytest.mark.parametrize("experiment_key", [None, "abc123dfassfasfsafsafd"]) | ||
def test_extra_config(experiment_key): | ||
project_name = "abc" | ||
workspace = "my_workspace" | ||
|
||
comet_decorator = comet_ml_init( | ||
project_name=project_name, | ||
workspace=workspace, | ||
experiment_key=experiment_key, | ||
secret=secret | ||
) | ||
|
||
assert comet_decorator.secret is secret | ||
extra_config = comet_decorator.get_extra_config() | ||
|
||
if experiment_key is None: | ||
assert extra_config[comet_decorator.LINK_TYPE_KEY] == COMET_ML_EXECUTION_TYPE_VALUE | ||
assert comet_decorator.COMET_ML_EXPERIMENT_KEY_KEY not in extra_config | ||
|
||
suffix = _generate_suffix_with_length_10(project_name=project_name, workspace=workspace) | ||
assert extra_config[comet_decorator.COMET_ML_URL_SUFFIX_KEY] == suffix | ||
|
||
else: | ||
assert extra_config[comet_decorator.LINK_TYPE_KEY] == COMET_ML_CUSTOM_TYPE_VALUE | ||
assert extra_config[comet_decorator.COMET_ML_EXPERIMENT_KEY_KEY] == experiment_key | ||
assert comet_decorator.COMET_ML_URL_SUFFIX_KEY not in extra_config | ||
|
||
assert extra_config[comet_decorator.COMET_ML_WORKSPACE_KEY] == workspace | ||
assert extra_config[comet_decorator.COMET_ML_HOST_KEY] == "https://www.comet.com" | ||
|
||
|
||
@task | ||
@comet_ml_init(project_name="abc", workspace="my-workspace", secret=secret, log_code=False) | ||
def train_model(): | ||
pass | ||
|
||
|
||
@patch("flytekitplugins.comet_ml.tracking.comet_ml") | ||
def test_local_execution(comet_ml_mock): | ||
train_model() | ||
|
||
comet_ml_mock.init.assert_called_with( | ||
project_name="abc", workspace="my-workspace", log_code=False) | ||
|
||
|
||
@task | ||
@comet_ml_init( | ||
project_name="xyz", | ||
workspace="another-workspace", | ||
secret=secret, | ||
experiment_key="my-previous-experiment-key", | ||
) | ||
def train_model_with_experiment_key(): | ||
pass | ||
|
||
|
||
@patch("flytekitplugins.comet_ml.tracking.comet_ml") | ||
def test_local_execution_with_experiment_key(comet_ml_mock): | ||
train_model_with_experiment_key() | ||
|
||
comet_ml_mock.init.assert_called_with( | ||
project_name="xyz", | ||
workspace="another-workspace", | ||
experiment_key="my-previous-experiment-key", | ||
) | ||
|
||
|
||
@patch("flytekitplugins.comet_ml.tracking.os") | ||
@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager") | ||
@patch("flytekitplugins.comet_ml.tracking.comet_ml") | ||
def test_remote_execution(comet_ml_mock, manager_mock, os_mock): | ||
# Pretend that the execution is remote | ||
ctx_mock = Mock() | ||
ctx_mock.execution_state.is_local_execution.return_value = False | ||
|
||
ctx_mock.user_space_params.secrets.get.return_value = "this_is_the_secret" | ||
ctx_mock.user_space_params.execution_id.name = "my_execution_id" | ||
|
||
manager_mock.current_context.return_value = ctx_mock | ||
hostname = "a423423423afasf4jigl-fasj4321-0" | ||
os_mock.environ = {"HOSTNAME": hostname} | ||
|
||
project_name = "abc" | ||
workspace = "my-workspace" | ||
|
||
h = shake_256(f"{project_name}-{workspace}".encode("utf-8")) | ||
suffix = h.hexdigest(5) | ||
hostname_alpha = hostname.replace("-", "") | ||
experiment_key = f"{hostname_alpha}{suffix}" | ||
|
||
train_model() | ||
|
||
comet_ml_mock.init.assert_called_with( | ||
project_name="abc", | ||
workspace="my-workspace", | ||
api_key="this_is_the_secret", | ||
experiment_key=experiment_key, | ||
log_code=False, | ||
) | ||
ctx_mock.user_space_params.secrets.get.assert_called_with(key="abc", group="xyz") | ||
|
||
|
||
def get_secret(): | ||
return "my-comet-ml-api-key" | ||
|
||
|
||
@task | ||
@comet_ml_init(project_name="my_project", workspace="my_workspace", secret=get_secret) | ||
def train_model_with_callable_secret(): | ||
pass | ||
|
||
|
||
@patch("flytekitplugins.comet_ml.tracking.os") | ||
@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager") | ||
@patch("flytekitplugins.comet_ml.tracking.comet_ml") | ||
def test_remote_execution_with_callable_secret(comet_ml_mock, manager_mock, os_mock): | ||
# Pretend that the execution is remote | ||
ctx_mock = Mock() | ||
ctx_mock.execution_state.is_local_execution.return_value = False | ||
|
||
manager_mock.current_context.return_value = ctx_mock | ||
hostname = "a423423423afasf4jigl-fasj4321-0" | ||
os_mock.environ = {"HOSTNAME": hostname} | ||
|
||
train_model_with_callable_secret() | ||
|
||
comet_ml_mock.init.assert_called_with( | ||
project_name="my_project", | ||
api_key="my-comet-ml-api-key", | ||
workspace="my_workspace", | ||
experiment_key=_generate_experiment_key(hostname, "my_project", "my_workspace") | ||
) | ||
|
||
|
||
def test_errors(): | ||
with pytest.raises(ValueError, match="project_name must be set"): | ||
comet_ml_init() | ||
|
||
with pytest.raises(ValueError, match="workspace must be set"): | ||
comet_ml_init(project_name="abc") | ||
|
||
with pytest.raises(ValueError, match="secret must be set"): | ||
comet_ml_init(project_name="abc", workspace="xyz") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use double quote here, right?