Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds neptune plugin for experiment tracking #2686

Merged
merged 13 commits into from
Aug 26, 2024
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ jobs:
- flytekit-mlflow
- flytekit-mmcloud
- flytekit-modin
- flytekit-neptune
- flytekit-onnx-pytorch
- flytekit-onnx-scikitlearn
# onnx-tensorflow needs a version of tensorflow that does not work with protobuf>4.
Expand Down
36 changes: 36 additions & 0 deletions plugins/flytekit-neptune/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Flytekit Neptune Plugin

Neptune is the MLOps stack component for experiment tracking. It offers a single place to log, compare, store, and collaborate on experiments and models. This plugin integrates Flyte with Neptune by configuring links between the two platforms.

To install the plugin, run:

```bash
pip install flytekitplugins-neptune
```

Neptune requires an API key to authenticate with their platform. This Flyte plugin requires a `flytekit` `Secret` to be configured using [Flyte's Secrets manager](https://docs.flyte.org/en/latest/user_guide/productionizing/secrets.html).

```python
from flytekit import Secret, current_context

neptune_api_token = Secret(key="neptune_api_token", group="neptune_group")

@task
@neptune_init_run(project="flytekit/project", secret=neptune_api_token)
def neptune_task() -> bool:
ctx = current_context()
run = ctx.neptune_run
run["algorithm"] = "my_algorithm"
...
```

To enable linking from the Flyte side panel to Neptune, add the following to Flyte's configuration:

```yaml
plugins:
logs:
dynamic-log-links:
- neptune-run-id:
displayName: Neptune
templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.project }}?query=(%60flyte%2Fexecution_id%60%3Astring%20%3D%20%22{{ .executionName }}-{{ .nodeId }}-{{ .taskRetryAttempt }}%22)&lbViewUnpacked=true"
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .tracking import neptune_init_run

__all__ = ["neptune_init_run"]
119 changes: 119 additions & 0 deletions plugins/flytekit-neptune/flytekitplugins/neptune/tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
from functools import partial
from typing import Callable, Union

import neptune
from flytekit import Secret
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.utils import ClassDecorator

NEPTUNE_RUN_VALUE = "neptune-run-id"


def neptune_init_run(
project: str,
secret: Union[Secret, Callable],
host: str = "https://app.neptune.ai",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason host has to be passed explicitly?
This can be extracted from the NEPTUNE_API_TOKEN

json.loads(base64.b64decode(os.getenv("NEPTUNE_API_TOKEN")).decode("utf-8"))["api_url"]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a IRL conversation with @SiddhantSadangi, we discussed how this is a limitation of Flyte and dynamic log links.

Context: The Flyte UI needs to know the host URL during registration time. There are two options:

  1. Have the host passed in explicitly (this PR)
  2. Force users to have NEPTUNE_API_TOKEN defined during registration time.

**init_run_kwargs: dict,
):
"""Neptune plugin.

Args:
project (str): Name of the project where the run should go, in the form `workspace-name/project_name`.
(Required)
secret (Secret or Callable): Secret with your `NEPTUNE_API_KEY` or a callable that returns the API key.
The callable takes no arguments and returns a string. (Required)
host (str): URL to Neptune. Defaults to "https://app.neptune.ai".
**init_run_kwargs (dict):
"""
return partial(
_neptune_init_run_class,
project=project,
secret=secret,
host=host,
**init_run_kwargs,
)


class _neptune_init_run_class(ClassDecorator):
NEPTUNE_HOST_KEY = "host"
NEPTUNE_PROJECT_KEY = "project"

def __init__(
self,
task_function: Callable,
project: str,
secret: Union[Secret, Callable],
host: str = "https://app.neptune.ai",
**init_run_kwargs: dict,
):
"""Neptune plugin. See `neptune_init_run` for documentation on the parameters.

`neptune_init_run` is the public interface to enforce that `project` and `secret`
must be passed in.
"""
self.project = project
self.secret = secret
self.host = host
self.init_run_kwargs = init_run_kwargs

super().__init__(task_function, project=project, secret=secret, host=host, **init_run_kwargs)

def _is_local_execution(self, ctx: FlyteContext) -> bool:
return ctx.execution_state.is_local_execution()

def _get_secret(self, ctx: FlyteContext) -> str:
if isinstance(self.secret, Secret):
secrets = ctx.user_space_params.secrets
return secrets.get(key=self.secret.key, group=self.secret.group)
else:
# Callable
return self.secret()

def execute(self, *args, **kwargs):
ctx = FlyteContextManager.current_context()
is_local_execution = self._is_local_execution(ctx)

init_run_kwargs = {"project": self.project, **self.init_run_kwargs}

if not is_local_execution:
init_run_kwargs["api_token"] = self._get_secret(ctx)

run = neptune.init_run(**init_run_kwargs)

if not is_local_execution:
# The HOSTNAME is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}
# If HOSTNAME is not defined, use the execution name as a fallback
hostname = os.environ.get("HOSTNAME", ctx.user_space_params.execution_id.name)
# Execution specific metadata
run["flyte/execution_id"] = hostname
run["flyte/project"] = ctx.user_space_params.execution_id.project
run["flyte/domain"] = ctx.user_space_params.execution_id.domain
run["flyte/name"] = ctx.user_space_params.execution_id.name
run["flyte/raw_output_prefix"] = ctx.user_space_params.raw_output_prefix
run["flyte/output_metadata_prefix"] = ctx.user_space_params.output_metadata_prefix
run["flyte/working_directory"] = ctx.user_space_params.working_directory

# Task specific metadata
run["flyte/task/name"] = ctx.user_space_params.task_id.name
run["flyte/task/project"] = ctx.user_space_params.task_id.project
run["flyte/task/domain"] = ctx.user_space_params.task_id.domain
run["flyte/task/version"] = ctx.user_space_params.task_id.version

if (execution_url := os.getenv("FLYTE_EXECUTION_URL")) is not None:
run["flyte/execution_url"] = execution_url

new_user_params = ctx.user_space_params.builder().add_attr("NEPTUNE_RUN", run).build()
with FlyteContextManager.with_context(
ctx.with_execution_state(ctx.execution_state.with_params(user_space_params=new_user_params))
):
output = self.task_function(*args, **kwargs)
run.stop()
return output

def get_extra_config(self):
return {
self.NEPTUNE_HOST_KEY: self.host,
self.NEPTUNE_PROJECT_KEY: self.project,
self.LINK_TYPE_KEY: NEPTUNE_RUN_VALUE,
}
38 changes: 38 additions & 0 deletions plugins/flytekit-neptune/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from setuptools import setup

PLUGIN_NAME = "neptune"


microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=1.13.3", "neptune>=1.10.4"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package enables seamless use of Neptune within Flyte",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.8",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
101 changes: 101 additions & 0 deletions plugins/flytekit-neptune/tests/test_neptune_init_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from unittest.mock import patch, Mock

from flytekit import Secret, task, current_context
from flytekit.core.context_manager import FlyteContextManager
from flytekitplugins.neptune import neptune_init_run
from flytekitplugins.neptune.tracking import _neptune_init_run_class

neptune_api_token = Secret(key="neptune_api_token", group="neptune_group")


def test_get_extra_config():

@neptune_init_run(project="flytekit/project", secret=neptune_api_token, tags=["my-tag"])
def my_task() -> bool:
...

config = my_task.get_extra_config()
assert config[my_task.NEPTUNE_HOST_KEY] == "https://app.neptune.ai"
assert config[my_task.NEPTUNE_PROJECT_KEY] == "flytekit/project"


@task
@neptune_init_run(project="flytekit/project", secret=neptune_api_token, tags=["my-tag"])
def neptune_task() -> bool:
ctx = current_context()
return ctx.neptune_run is not None


@patch("flytekitplugins.neptune.tracking.neptune")
def test_local_project_and_init_run_kwargs(neptune_mock):
neptune_exists = neptune_task()
assert neptune_exists

neptune_mock.init_run.assert_called_with(
project="flytekit/project", tags=["my-tag"]
)


class RunObjectMock(dict):
def __init__(self):
self._stop_called = False

def stop(self):
self._stop_called = True


@patch.object(_neptune_init_run_class, "_get_secret")
@patch.object(_neptune_init_run_class, "_is_local_execution")
@patch("flytekitplugins.neptune.tracking.neptune")
def test_remote_project_and_init_run_kwargs(
neptune_mock,
mock_is_local_execution,
mock_get_secret,
monkeypatch,
):
# Pretend that the execution is remote
mock_is_local_execution.return_value = False
api_token = "this-is-my-api-token"
mock_get_secret.return_value = api_token

host_name = "ff59abade1e7f4758baf-mainmytask-0"
execution_url = "https://my-host.com/execution_url"
monkeypatch.setenv("HOSTNAME", host_name)
monkeypatch.setenv("FLYTE_EXECUTION_URL", execution_url)

run_mock = RunObjectMock()
init_run_mock = Mock(return_value=run_mock)
neptune_mock.init_run = init_run_mock

neptune_task()

init_run_mock.assert_called_with(project="flytekit/project", tags=["my-tag"], api_token=api_token)
assert run_mock["Flyte Execution ID"] == host_name
assert run_mock["Flyte Execution URL"] == execution_url
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to update these assertions.



def test_get_secret_callable():
def get_secret():
return "abc-123"

@neptune_init_run(project="flytekit/project", secret=get_secret, tags=["my-tag"])
def my_task():
pass

ctx_mock = Mock()
assert my_task._get_secret(ctx_mock) == "abc-123"


def test_get_secret_object():
secret_obj = Secret(key="my_key", group="my_group")

@neptune_init_run(project="flytekit/project", secret=secret_obj, tags=["my-tag"])
def my_task():
pass

get_secret_mock = Mock(return_value="my-secret-value")
ctx_mock = Mock()
ctx_mock.user_space_params.secrets.get = get_secret_mock

assert my_task._get_secret(ctx_mock) == "my-secret-value"
get_secret_mock.assert_called_with(key="my_key", group="my_group")
Loading