diff --git a/plugins/flytekit-skyplane/README.md b/plugins/flytekit-skyplane/README.md new file mode 100644 index 0000000000..94a70a2c88 --- /dev/null +++ b/plugins/flytekit-skyplane/README.md @@ -0,0 +1,26 @@ +# Flyte Skyplane Plugin + +This plugin adds the capability of transferring data using Skyplane to Flyte. Skyplane allows for significantly faster and cheaper data transfers compared to traditional methods. + +## Overview + +The Flyte Skyplane Plugin provides an interface for transferring data between locations using Skyplane's capabilities. This plugin can be used within Flyte workflows to enhance data transfer efficiency. + +## Features + +- **Faster Data Transfers**: Utilizes Skyplane for high-speed data transfers. +- **Cost Efficiency**: Reduces costs associated with traditional data transfer methods. +- **Easy Integration**: Seamlessly integrates with Flyte's task system. + +## Installation + +```python +setup( + name='flytekit-skyplane', + version='0.1.0', + packages=['flytekitplugin.skyplane'], + install_requires=[ + 'flytekit', + 'skyplane', + ], +) \ No newline at end of file diff --git a/plugins/flytekit-skyplane/flytekitplugins/skyplane/__init__.py b/plugins/flytekit-skyplane/flytekitplugins/skyplane/__init__.py new file mode 100644 index 0000000000..0e98d8c555 --- /dev/null +++ b/plugins/flytekit-skyplane/flytekitplugins/skyplane/__init__.py @@ -0,0 +1,16 @@ +""" +.. currentmodule:: flytekitplugin.skyplane + +This package provides functionality for integrating Skyplane into Flyte workflows. + +.. autosummary:: + :template: custom.rst + :toctree: generated/ + + SkyplaneFunctionTask + SkyplaneJob +""" + +from .skyplane import SkyplaneFunctionTask, SkyplaneJob + +__all__ = ["SkyplaneFunctionTask", "SkyplaneJob"] \ No newline at end of file diff --git a/plugins/flytekit-skyplane/flytekitplugins/skyplane/tasks.py b/plugins/flytekit-skyplane/flytekitplugins/skyplane/tasks.py new file mode 100644 index 0000000000..5134d6b603 --- /dev/null +++ b/plugins/flytekit-skyplane/flytekitplugins/skyplane/tasks.py @@ -0,0 +1,92 @@ +""" +This Plugin adds the capability of transferring data using Skyplane to Flyte. +Skyplane allows for significantly faster and cheaper data transfers compared to traditional methods. +""" + +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from flytekit import PythonFunctionTask +from flytekit.configuration import SerializationSettings +from flytekit.extend import TaskPlugins + +import skyplane + +@dataclass +class SkyplaneJob: + """ + Configuration for a Skyplane transfer job. + + Args: + source: Source location from where data will be transferred. + destination: Destination location where data will be transferred. + options: Additional options for the Skyplane transfer. + """ + + source: str + destination: str + options: Optional[Dict[str, Any]] = field(default_factory=dict) + + +class SkyplaneFunctionTask(PythonFunctionTask[SkyplaneJob]): + """ + A Flyte task that uses Skyplane to transfer data. + """ + + _SKYPLANE_JOB_TYPE = "skyplane" + + def __init__(self, task_config: SkyplaneJob, task_function: Callable, **kwargs): + super().__init__( + task_config=task_config, + task_function=task_function, + task_type=self._SKYPLANE_JOB_TYPE, + **kwargs, + ) + + def execute(self, **kwargs) -> None: + """ + Run the Skyplane data transfer. + """ + # Construct the Skyplane command + skyplane_cmd = [ + "skyplane", + "transfer", + self.task_config.source, + self.task_config.destination + ] + + # Add any additional options + if self.task_config.options: + for key, value in self.task_config.options.items(): + skyplane_cmd.append(f"--{key}={value}") + + # Execute the Skyplane command + try: + result = subprocess.run(skyplane_cmd, capture_output=True, text=True, check=True) + print(f"Skyplane transfer successful: {result.stdout}") + except subprocess.CalledProcessError as e: + print(f"Skyplane transfer failed: {e.stderr}") + raise + + def get_command(self, settings: SerializationSettings) -> List[str]: + """ + Constructs the command to be executed for the Skyplane transfer. + + Args: + settings: Serialization settings for the Flyte task. + + Returns: + A list of command line arguments for Skyplane. + """ + cmd = super().get_command(settings) + skyplane_cmd = ["skyplane", "transfer", self.task_config.source, self.task_config.destination] + + # Add additional options if specified + if self.task_config.options: + for key, value in self.task_config.options.items(): + skyplane_cmd.append(f"--{key}={value}") + + return skyplane_cmd + cmd + +# Register the Skyplane Plugin into the Flytekit core plugin system +TaskPlugins.register_pythontask_plugin(SkyplaneJob, SkyplaneFunctionTask) \ No newline at end of file diff --git a/plugins/flytekit-skyplane/setup.py b/plugins/flytekit-skyplane/setup.py new file mode 100644 index 0000000000..899d6c7d02 --- /dev/null +++ b/plugins/flytekit-skyplane/setup.py @@ -0,0 +1,44 @@ +from setuptools import setup + +PLUGIN_NAME = "skyplane" + +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" + +plugin_requires = [ + "cloudpickle", + "flyteidl>=1.5.1", + "flytekit>=1.6.1", + "kubernetes", + "skyplane" +] + +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="flyteorg", + author_email="admin@flyte.org", + description="Skyplane data transfer plugin for Flytekit", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + extras_require={ + # Add any optional dependencies if needed + }, + license="apache2", + python_requires=">=3.8", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) \ No newline at end of file diff --git a/plugins/flytekit-skyplane/tests/testscript.py b/plugins/flytekit-skyplane/tests/testscript.py new file mode 100644 index 0000000000..d7c0c7c6f3 --- /dev/null +++ b/plugins/flytekit-skyplane/tests/testscript.py @@ -0,0 +1,20 @@ +from flytekit import workflow, task +# from flytekitplugin.skyplane import +from flytekitplugin.skyplane.skyplane import SkyplaneFunctionTask, SkyplaneJob + +@task +def dummy_data_transfer_task(): + job_config = SkyplaneJob( + source="s3://my-source-bucket/data/", + destination="s3://my-destination-bucket/data/", + options={"overwrite": "true"} + ) + skyplane_task = SkyplaneFunctionTask(job_config=job_config, task_function=lambda: "Data transferred!") + return skyplane_task + +@workflow +def test_workflow(): + return dummy_data_transfer_task() + +if __name__ == "__main__": + print(test_workflow()) \ No newline at end of file