diff --git a/.github/workflows/pythonbuild.yml b/.github/workflows/pythonbuild.yml index 8d450ffc87..005658497b 100644 --- a/.github/workflows/pythonbuild.yml +++ b/.github/workflows/pythonbuild.yml @@ -316,6 +316,7 @@ jobs: - flytekit-aws-batch - flytekit-aws-sagemaker - flytekit-bigquery + - flytekit-comet-ml - flytekit-dask - flytekit-data-fsspec - flytekit-dbt diff --git a/plugins/flytekit-comet-ml/README.md b/plugins/flytekit-comet-ml/README.md new file mode 100644 index 0000000000..a7038c8caf --- /dev/null +++ b/plugins/flytekit-comet-ml/README.md @@ -0,0 +1,26 @@ +# Flytekit Comet Plugin + +Comet’s machine learning platform integrates with your existing infrastructure and tools so you can manage, visualize, and optimize models—from training runs to production monitoring. This plugin integrates Flyte with Comet.ml by configuring links between the two platforms. + +To install the plugin, run: + +```bash +pip install flytekitplugins-comet-ml +``` + +Comet requires an API key to authenticate with their platform. In the above example, a secret is created using +[Flyte's Secrets manager](https://docs.flyte.org/en/latest/user_guide/productionizing/secrets.html). + +To enable linking from the Flyte side panel to Comet.ml, add the following to Flyte's configuration: + +```yaml +plugins: + logs: + dynamic-log-links: + - comet-ml-execution-id: + displayName: Comet + templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .executionName }}{{ .nodeId }}{{ .taskRetryAttempt }}{{ .taskConfig.link_suffix }}" + - comet-ml-custom-id: + displayName: Comet + templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.workspace }}/{{ .taskConfig.project_name }}/{{ .taskConfig.experiment_key }}" +``` diff --git a/plugins/flytekit-comet-ml/flytekitplugins/comet_ml/__init__.py b/plugins/flytekit-comet-ml/flytekitplugins/comet_ml/__init__.py new file mode 100644 index 0000000000..58dbff81d2 --- /dev/null +++ b/plugins/flytekit-comet-ml/flytekitplugins/comet_ml/__init__.py @@ -0,0 +1,3 @@ +from .tracking import comet_ml_login + +__all__ = ["comet_ml_login"] diff --git a/plugins/flytekit-comet-ml/flytekitplugins/comet_ml/tracking.py b/plugins/flytekit-comet-ml/flytekitplugins/comet_ml/tracking.py new file mode 100644 index 0000000000..3014513d0d --- /dev/null +++ b/plugins/flytekit-comet-ml/flytekitplugins/comet_ml/tracking.py @@ -0,0 +1,173 @@ +import os +from functools import partial +from hashlib import shake_256 +from typing import Callable, Optional, Union + +import comet_ml +from flytekit import Secret +from flytekit.core.context_manager import FlyteContextManager +from flytekit.core.utils import ClassDecorator + +COMET_ML_EXECUTION_TYPE_VALUE = "comet-ml-execution-id" +COMET_ML_CUSTOM_TYPE_VALUE = "comet-ml-custom-id" + + +def _generate_suffix_with_length_10(project_name: str, workspace: str) -> str: + """Generate suffix from project_name + workspace.""" + h = shake_256(f"{project_name}-{workspace}".encode("utf-8")) + # Using 5 generates a suffix with length 10 + return h.hexdigest(5) + + +def _generate_experiment_key(hostname: str, project_name: str, workspace: str) -> str: + """Generate experiment key that comet_ml can use: + + 1. Is alphanumeric + 2. 32 <= len(experiment_key) <= 50 + """ + # In Flyte, then hostname is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}, where + # - len(executionName) == 20 + # - 2 <= len(nodeId) <= 8 + # - 1 <= len(taskRetryAttempt)) <= 2 (In practice, retries does not go above 99) + # Removing the `-` because it is not alphanumeric, the 23 <= len(hostname) <= 30 + # On the low end we need to add 10 characters to stay in the range acceptable to comet_ml + hostname = hostname.replace("-", "") + suffix = _generate_suffix_with_length_10(project_name, workspace) + return f"{hostname}{suffix}" + + +def comet_ml_login( + project_name: str, + workspace: str, + secret: Union[Secret, Callable], + experiment_key: Optional[str] = None, + host: str = "https://www.comet.com", + **login_kwargs: dict, +): + """Comet plugin. + Args: + project_name (str): Send your experiment to a specific project. (Required) + workspace (str): Attach an experiment to a project that belongs to this workspace. (Required) + secret (Secret or Callable): Secret with your `COMET_API_KEY` or a callable that returns the API key. + The callable takes no arguments and returns a string. (Required) + experiment_key (str): Experiment key. + host (str): URL to your Comet service. Defaults to "https://www.comet.com" + **login_kwargs (dict): The rest of the arguments are passed directly to `comet_ml.login`. + """ + return partial( + _comet_ml_login_class, + project_name=project_name, + workspace=workspace, + secret=secret, + experiment_key=experiment_key, + host=host, + **login_kwargs, + ) + + +class _comet_ml_login_class(ClassDecorator): + COMET_ML_PROJECT_NAME_KEY = "project_name" + COMET_ML_WORKSPACE_KEY = "workspace" + COMET_ML_EXPERIMENT_KEY_KEY = "experiment_key" + COMET_ML_URL_SUFFIX_KEY = "link_suffix" + COMET_ML_HOST_KEY = "host" + + def __init__( + self, + task_function: Callable, + project_name: str, + workspace: str, + secret: Union[Secret, Callable], + experiment_key: Optional[str] = None, + host: str = "https://www.comet.com", + **login_kwargs: dict, + ): + """Comet plugin. + Args: + project_name (str): Send your experiment to a specific project. (Required) + workspace (str): Attach an experiment to a project that belongs to this workspace. (Required) + secret (Secret or Callable): Secret with your `COMET_API_KEY` or a callable that returns the API key. + The callable takes no arguments and returns a string. (Required) + experiment_key (str): Experiment key. + host (str): URL to your Comet service. Defaults to "https://www.comet.com" + **login_kwargs (dict): The rest of the arguments are passed directly to `comet_ml.login`. + """ + + self.project_name = project_name + self.workspace = workspace + self.experiment_key = experiment_key + self.secret = secret + self.host = host + self.login_kwargs = login_kwargs + + super().__init__( + task_function, + project_name=project_name, + workspace=workspace, + experiment_key=experiment_key, + secret=secret, + host=host, + **login_kwargs, + ) + + def execute(self, *args, **kwargs): + ctx = FlyteContextManager.current_context() + is_local_execution = ctx.execution_state.is_local_execution() + + default_kwargs = self.login_kwargs + login_kwargs = { + "project_name": self.project_name, + "workspace": self.workspace, + **default_kwargs, + } + + if is_local_execution: + # For local execution, always use the experiment_key. If `self.experiment_key` is `None`, comet_ml + # will generate it's own key + if self.experiment_key is not None: + login_kwargs["experiment_key"] = self.experiment_key + else: + # Get api key for remote execution + if isinstance(self.secret, Secret): + secrets = ctx.user_space_params.secrets + comet_ml_api_key = secrets.get(key=self.secret.key, group=self.secret.group) + else: + comet_ml_api_key = self.secret() + + login_kwargs["api_key"] = comet_ml_api_key + + if self.experiment_key is None: + # The HOSTNAME is set to {.executionName}-{.nodeID}-{.taskRetryAttempt} + # If HOSTNAME is not defined, use the execution name as a fallback + hostname = os.environ.get("HOSTNAME", ctx.user_space_params.execution_id.name) + experiment_key = _generate_experiment_key(hostname, self.project_name, self.workspace) + else: + experiment_key = self.experiment_key + + login_kwargs["experiment_key"] = experiment_key + + if hasattr(comet_ml, "login"): + comet_ml.login(**login_kwargs) + else: + comet_ml.init(**login_kwargs) + + output = self.task_function(*args, **kwargs) + return output + + def get_extra_config(self): + extra_config = { + self.COMET_ML_PROJECT_NAME_KEY: self.project_name, + self.COMET_ML_WORKSPACE_KEY: self.workspace, + self.COMET_ML_HOST_KEY: self.host, + } + + if self.experiment_key is None: + comet_ml_value = COMET_ML_EXECUTION_TYPE_VALUE + suffix = _generate_suffix_with_length_10(self.project_name, self.workspace) + extra_config[self.COMET_ML_URL_SUFFIX_KEY] = suffix + else: + comet_ml_value = COMET_ML_CUSTOM_TYPE_VALUE + extra_config[self.COMET_ML_EXPERIMENT_KEY_KEY] = self.experiment_key + + extra_config[self.LINK_TYPE_KEY] = comet_ml_value + return extra_config diff --git a/plugins/flytekit-comet-ml/setup.py b/plugins/flytekit-comet-ml/setup.py new file mode 100644 index 0000000000..387b9119e3 --- /dev/null +++ b/plugins/flytekit-comet-ml/setup.py @@ -0,0 +1,39 @@ +from setuptools import setup + +PLUGIN_NAME = "comet-ml" +MODULE_NAME = "comet_ml" + + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = ["flytekit>=1.12.3", "comet-ml>=3.43.2"] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="This package enables seamless use of Comet within Flyte", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{MODULE_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.8", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) diff --git a/plugins/flytekit-comet-ml/tests/test_comet_ml_init.py b/plugins/flytekit-comet-ml/tests/test_comet_ml_init.py new file mode 100644 index 0000000000..5572e4a56e --- /dev/null +++ b/plugins/flytekit-comet-ml/tests/test_comet_ml_init.py @@ -0,0 +1,153 @@ +from hashlib import shake_256 +from unittest.mock import patch, Mock +import pytest + +from flytekit import Secret, task +from flytekitplugins.comet_ml import comet_ml_login +from flytekitplugins.comet_ml.tracking import ( + COMET_ML_CUSTOM_TYPE_VALUE, + COMET_ML_EXECUTION_TYPE_VALUE, + _generate_suffix_with_length_10, + _generate_experiment_key, +) + + +secret = Secret(key="abc", group="xyz") + + +@pytest.mark.parametrize("experiment_key", [None, "abc123dfassfasfsafsafd"]) +def test_extra_config(experiment_key): + project_name = "abc" + workspace = "my_workspace" + + comet_decorator = comet_ml_login( + project_name=project_name, + workspace=workspace, + experiment_key=experiment_key, + secret=secret + ) + + @comet_decorator + def task(): + pass + + assert task.secret is secret + extra_config = task.get_extra_config() + + if experiment_key is None: + assert extra_config[task.LINK_TYPE_KEY] == COMET_ML_EXECUTION_TYPE_VALUE + assert task.COMET_ML_EXPERIMENT_KEY_KEY not in extra_config + + suffix = _generate_suffix_with_length_10(project_name=project_name, workspace=workspace) + assert extra_config[task.COMET_ML_URL_SUFFIX_KEY] == suffix + + else: + assert extra_config[task.LINK_TYPE_KEY] == COMET_ML_CUSTOM_TYPE_VALUE + assert extra_config[task.COMET_ML_EXPERIMENT_KEY_KEY] == experiment_key + assert task.COMET_ML_URL_SUFFIX_KEY not in extra_config + + assert extra_config[task.COMET_ML_WORKSPACE_KEY] == workspace + assert extra_config[task.COMET_ML_HOST_KEY] == "https://www.comet.com" + + +@task +@comet_ml_login(project_name="abc", workspace="my-workspace", secret=secret, log_code=False) +def train_model(): + pass + + +@patch("flytekitplugins.comet_ml.tracking.comet_ml") +def test_local_execution(comet_ml_mock): + train_model() + + comet_ml_mock.login.assert_called_with( + project_name="abc", workspace="my-workspace", log_code=False) + + +@task +@comet_ml_login( + project_name="xyz", + workspace="another-workspace", + secret=secret, + experiment_key="my-previous-experiment-key", +) +def train_model_with_experiment_key(): + pass + + +@patch("flytekitplugins.comet_ml.tracking.comet_ml") +def test_local_execution_with_experiment_key(comet_ml_mock): + train_model_with_experiment_key() + + comet_ml_mock.login.assert_called_with( + project_name="xyz", + workspace="another-workspace", + experiment_key="my-previous-experiment-key", + ) + + +@patch("flytekitplugins.comet_ml.tracking.os") +@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager") +@patch("flytekitplugins.comet_ml.tracking.comet_ml") +def test_remote_execution(comet_ml_mock, manager_mock, os_mock): + # Pretend that the execution is remote + ctx_mock = Mock() + ctx_mock.execution_state.is_local_execution.return_value = False + + ctx_mock.user_space_params.secrets.get.return_value = "this_is_the_secret" + ctx_mock.user_space_params.execution_id.name = "my_execution_id" + + manager_mock.current_context.return_value = ctx_mock + hostname = "a423423423afasf4jigl-fasj4321-0" + os_mock.environ = {"HOSTNAME": hostname} + + project_name = "abc" + workspace = "my-workspace" + + h = shake_256(f"{project_name}-{workspace}".encode("utf-8")) + suffix = h.hexdigest(5) + hostname_alpha = hostname.replace("-", "") + experiment_key = f"{hostname_alpha}{suffix}" + + train_model() + + comet_ml_mock.login.assert_called_with( + project_name="abc", + workspace="my-workspace", + api_key="this_is_the_secret", + experiment_key=experiment_key, + log_code=False, + ) + ctx_mock.user_space_params.secrets.get.assert_called_with(key="abc", group="xyz") + + +def get_secret(): + return "my-comet-ml-api-key" + + +@task +@comet_ml_login(project_name="my_project", workspace="my_workspace", secret=get_secret) +def train_model_with_callable_secret(): + pass + + +@patch("flytekitplugins.comet_ml.tracking.os") +@patch("flytekitplugins.comet_ml.tracking.FlyteContextManager") +@patch("flytekitplugins.comet_ml.tracking.comet_ml") +def test_remote_execution_with_callable_secret(comet_ml_mock, manager_mock, os_mock): + # Pretend that the execution is remote + ctx_mock = Mock() + ctx_mock.execution_state.is_local_execution.return_value = False + + manager_mock.current_context.return_value = ctx_mock + hostname = "a423423423afasf4jigl-fasj4321-0" + os_mock.environ = {"HOSTNAME": hostname} + + train_model_with_callable_secret() + + comet_ml_mock.login.assert_called_with( + project_name="my_project", + api_key="my-comet-ml-api-key", + workspace="my_workspace", + experiment_key=_generate_experiment_key(hostname, "my_project", "my_workspace") + )