Skip to content

Commit

Permalink
Adds neptune plugin for experiment tracking (#2686)
Browse files Browse the repository at this point in the history
* Adds neptune plugin for experiment tracking

Signed-off-by: Thomas J. Fan <[email protected]>

* Adds neptune to github actions

Signed-off-by: Thomas J. Fan <[email protected]>

* Fix unit tests

Signed-off-by: Thomas J. Fan <[email protected]>

* Add more information about context

Signed-off-by: Thomas J. Fan <[email protected]>

* Use NEPTUNE_API_KEY

Signed-off-by: Thomas J. Fan <[email protected]>

* Use flyte namespace for logging

Signed-off-by: Thomas J. Fan <[email protected]>

* Update README.md

Signed-off-by: Thomas J. Fan <[email protected]>

* Update README.md

Signed-off-by: Thomas J. Fan <[email protected]>

* Add more flyte specfic metadata

Signed-off-by: Thomas J. Fan <[email protected]>

* Add more flyte specfic metadata

Signed-off-by: Thomas J. Fan <[email protected]>

* Fix tests with new names

Signed-off-by: Thomas J. Fan <[email protected]>

---------

Signed-off-by: Thomas J. Fan <[email protected]>
  • Loading branch information
thomasjpfan authored Aug 26, 2024
1 parent 9d90dd1 commit 64c56f8
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ jobs:
- flytekit-mlflow
- flytekit-mmcloud
- flytekit-modin
- flytekit-neptune
- flytekit-onnx-pytorch
- flytekit-onnx-scikitlearn
# onnx-tensorflow needs a version of tensorflow that does not work with protobuf>4.
Expand Down
36 changes: 36 additions & 0 deletions plugins/flytekit-neptune/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Flytekit Neptune Plugin

Neptune is the MLOps stack component for experiment tracking. It offers a single place to log, compare, store, and collaborate on experiments and models. This plugin integrates Flyte with Neptune by configuring links between the two platforms.

To install the plugin, run:

```bash
pip install flytekitplugins-neptune
```

Neptune requires an API key to authenticate with their platform. This Flyte plugin requires a `flytekit` `Secret` to be configured using [Flyte's Secrets manager](https://docs.flyte.org/en/latest/user_guide/productionizing/secrets.html).

```python
from flytekit import Secret, current_context

neptune_api_token = Secret(key="neptune_api_token", group="neptune_group")

@task
@neptune_init_run(project="flytekit/project", secret=neptune_api_token)
def neptune_task() -> bool:
ctx = current_context()
run = ctx.neptune_run
run["algorithm"] = "my_algorithm"
...
```

To enable linking from the Flyte side panel to Neptune, add the following to Flyte's configuration:

```yaml
plugins:
logs:
dynamic-log-links:
- neptune-run-id:
displayName: Neptune
templateUris: "{{ .taskConfig.host }}/{{ .taskConfig.project }}?query=(%60flyte%2Fexecution_id%60%3Astring%20%3D%20%22{{ .executionName }}-{{ .nodeId }}-{{ .taskRetryAttempt }}%22)&lbViewUnpacked=true"
```
3 changes: 3 additions & 0 deletions plugins/flytekit-neptune/flytekitplugins/neptune/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .tracking import neptune_init_run

__all__ = ["neptune_init_run"]
119 changes: 119 additions & 0 deletions plugins/flytekit-neptune/flytekitplugins/neptune/tracking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
from functools import partial
from typing import Callable, Union

import neptune
from flytekit import Secret
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.utils import ClassDecorator

NEPTUNE_RUN_VALUE = "neptune-run-id"


def neptune_init_run(
project: str,
secret: Union[Secret, Callable],
host: str = "https://app.neptune.ai",
**init_run_kwargs: dict,
):
"""Neptune plugin.
Args:
project (str): Name of the project where the run should go, in the form `workspace-name/project_name`.
(Required)
secret (Secret or Callable): Secret with your `NEPTUNE_API_KEY` or a callable that returns the API key.
The callable takes no arguments and returns a string. (Required)
host (str): URL to Neptune. Defaults to "https://app.neptune.ai".
**init_run_kwargs (dict):
"""
return partial(
_neptune_init_run_class,
project=project,
secret=secret,
host=host,
**init_run_kwargs,
)


class _neptune_init_run_class(ClassDecorator):
NEPTUNE_HOST_KEY = "host"
NEPTUNE_PROJECT_KEY = "project"

def __init__(
self,
task_function: Callable,
project: str,
secret: Union[Secret, Callable],
host: str = "https://app.neptune.ai",
**init_run_kwargs: dict,
):
"""Neptune plugin. See `neptune_init_run` for documentation on the parameters.
`neptune_init_run` is the public interface to enforce that `project` and `secret`
must be passed in.
"""
self.project = project
self.secret = secret
self.host = host
self.init_run_kwargs = init_run_kwargs

super().__init__(task_function, project=project, secret=secret, host=host, **init_run_kwargs)

def _is_local_execution(self, ctx: FlyteContext) -> bool:
return ctx.execution_state.is_local_execution()

def _get_secret(self, ctx: FlyteContext) -> str:
if isinstance(self.secret, Secret):
secrets = ctx.user_space_params.secrets
return secrets.get(key=self.secret.key, group=self.secret.group)
else:
# Callable
return self.secret()

def execute(self, *args, **kwargs):
ctx = FlyteContextManager.current_context()
is_local_execution = self._is_local_execution(ctx)

init_run_kwargs = {"project": self.project, **self.init_run_kwargs}

if not is_local_execution:
init_run_kwargs["api_token"] = self._get_secret(ctx)

run = neptune.init_run(**init_run_kwargs)

if not is_local_execution:
# The HOSTNAME is set to {.executionName}-{.nodeID}-{.taskRetryAttempt}
# If HOSTNAME is not defined, use the execution name as a fallback
hostname = os.environ.get("HOSTNAME", ctx.user_space_params.execution_id.name)
# Execution specific metadata
run["flyte/execution_id"] = hostname
run["flyte/project"] = ctx.user_space_params.execution_id.project
run["flyte/domain"] = ctx.user_space_params.execution_id.domain
run["flyte/name"] = ctx.user_space_params.execution_id.name
run["flyte/raw_output_prefix"] = ctx.user_space_params.raw_output_prefix
run["flyte/output_metadata_prefix"] = ctx.user_space_params.output_metadata_prefix
run["flyte/working_directory"] = ctx.user_space_params.working_directory

# Task specific metadata
run["flyte/task/name"] = ctx.user_space_params.task_id.name
run["flyte/task/project"] = ctx.user_space_params.task_id.project
run["flyte/task/domain"] = ctx.user_space_params.task_id.domain
run["flyte/task/version"] = ctx.user_space_params.task_id.version

if (execution_url := os.getenv("FLYTE_EXECUTION_URL")) is not None:
run["flyte/execution_url"] = execution_url

new_user_params = ctx.user_space_params.builder().add_attr("NEPTUNE_RUN", run).build()
with FlyteContextManager.with_context(
ctx.with_execution_state(ctx.execution_state.with_params(user_space_params=new_user_params))
):
output = self.task_function(*args, **kwargs)
run.stop()
return output

def get_extra_config(self):
return {
self.NEPTUNE_HOST_KEY: self.host,
self.NEPTUNE_PROJECT_KEY: self.project,
self.LINK_TYPE_KEY: NEPTUNE_RUN_VALUE,
}
38 changes: 38 additions & 0 deletions plugins/flytekit-neptune/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from setuptools import setup

PLUGIN_NAME = "neptune"


microlib_name = f"flytekitplugins-{PLUGIN_NAME}"

plugin_requires = ["flytekit>=1.13.3", "neptune>=1.10.4"]

__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="flyteorg",
author_email="[email protected]",
description="This package enables seamless use of Neptune within Flyte",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.8",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
101 changes: 101 additions & 0 deletions plugins/flytekit-neptune/tests/test_neptune_init_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from unittest.mock import patch, Mock

from flytekit import Secret, task, current_context
from flytekit.core.context_manager import FlyteContextManager
from flytekitplugins.neptune import neptune_init_run
from flytekitplugins.neptune.tracking import _neptune_init_run_class

neptune_api_token = Secret(key="neptune_api_token", group="neptune_group")


def test_get_extra_config():

@neptune_init_run(project="flytekit/project", secret=neptune_api_token, tags=["my-tag"])
def my_task() -> bool:
...

config = my_task.get_extra_config()
assert config[my_task.NEPTUNE_HOST_KEY] == "https://app.neptune.ai"
assert config[my_task.NEPTUNE_PROJECT_KEY] == "flytekit/project"


@task
@neptune_init_run(project="flytekit/project", secret=neptune_api_token, tags=["my-tag"])
def neptune_task() -> bool:
ctx = current_context()
return ctx.neptune_run is not None


@patch("flytekitplugins.neptune.tracking.neptune")
def test_local_project_and_init_run_kwargs(neptune_mock):
neptune_exists = neptune_task()
assert neptune_exists

neptune_mock.init_run.assert_called_with(
project="flytekit/project", tags=["my-tag"]
)


class RunObjectMock(dict):
def __init__(self):
self._stop_called = False

def stop(self):
self._stop_called = True


@patch.object(_neptune_init_run_class, "_get_secret")
@patch.object(_neptune_init_run_class, "_is_local_execution")
@patch("flytekitplugins.neptune.tracking.neptune")
def test_remote_project_and_init_run_kwargs(
neptune_mock,
mock_is_local_execution,
mock_get_secret,
monkeypatch,
):
# Pretend that the execution is remote
mock_is_local_execution.return_value = False
api_token = "this-is-my-api-token"
mock_get_secret.return_value = api_token

host_name = "ff59abade1e7f4758baf-mainmytask-0"
execution_url = "https://my-host.com/execution_url"
monkeypatch.setenv("HOSTNAME", host_name)
monkeypatch.setenv("FLYTE_EXECUTION_URL", execution_url)

run_mock = RunObjectMock()
init_run_mock = Mock(return_value=run_mock)
neptune_mock.init_run = init_run_mock

neptune_task()

init_run_mock.assert_called_with(project="flytekit/project", tags=["my-tag"], api_token=api_token)
assert run_mock["flyte/execution_id"] == host_name
assert run_mock["flyte/execution_url"] == execution_url


def test_get_secret_callable():
def get_secret():
return "abc-123"

@neptune_init_run(project="flytekit/project", secret=get_secret, tags=["my-tag"])
def my_task():
pass

ctx_mock = Mock()
assert my_task._get_secret(ctx_mock) == "abc-123"


def test_get_secret_object():
secret_obj = Secret(key="my_key", group="my_group")

@neptune_init_run(project="flytekit/project", secret=secret_obj, tags=["my-tag"])
def my_task():
pass

get_secret_mock = Mock(return_value="my-secret-value")
ctx_mock = Mock()
ctx_mock.user_space_params.secrets.get = get_secret_mock

assert my_task._get_secret(ctx_mock) == "my-secret-value"
get_secret_mock.assert_called_with(key="my_key", group="my_group")

0 comments on commit 64c56f8

Please sign in to comment.