Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

document flytekit remote module #564

Merged
merged 4 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions docs/source/design/control_plane.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
FlyteRemote: A Programmatic Control Plane Interface
###################################################

For those who require programmatic access to the control plane, :mod:`~flytekit.remote` module enables you to perform
For those who require programmatic access to the control plane, the :mod:`~flytekit.remote` module enables you to perform
certain operations in a python runtime environment.

Since this section naturally deals with the control plane, this discussion is only relevant for those who have a Flyte
backend set up, and have access to it (a local sandbox will suffice as well of course). These objects do not rely on the
underlying code they represent being locally available.
backend set up and have access to it (a :std:ref:`local sandbox <flyte:deployment-sandbox>` will suffice as well).

***************************
Create a FlyteRemote Object
***************************

The :class:`~flytekit.remote.FlyteRemote` class is the entrypoint for programmatically performing operations in a python
The :class:`~flytekit.remote.remote.FlyteRemote` class is the entrypoint for programmatically performing operations in a python
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
runtime. There are two ways of creating a remote object.

**Initialize directly**
Expand All @@ -24,14 +23,19 @@ runtime. There are two ways of creating a remote object.

from flytekit.remote import FlyteRemote

remote = FlyteRemote(default_project="project", default_domain="domain", flyte_admin_url="<url>", insecure=True)
remote = FlyteRemote(
default_project="project",
default_domain="domain",
flyte_admin_url="<url>",
insecure=True,
)

**Initialize from flyte config**

.. TODO: link documentation to flyte config and environment variables

This will initialize a :class:`~flytekit.remote.FlyteRemote` object from your flyte config file or environment variable
overrides
This will initialize a :class:`~flytekit.remote.remote.FlyteRemote` object from your flyte config file or environment variable
overrides:

.. code-block:: python

Expand All @@ -51,7 +55,7 @@ Fetching Flyte Admin Entities
Executing Entities
******************

You can execute all of these flyte entities, which returns a :class:`~flytekit.remote.FlyteWorkflowExecution` object.
You can execute all of these flyte entities, which returns a :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` object.
For more information on flyte entities, see the See the :ref:`remote flyte entities <remote-flyte-execution-objects>`
reference.

Expand All @@ -64,22 +68,23 @@ reference.
Waiting for Execution Completion
********************************

You can use the :meth:`~flytekit.remote.remote.FlyteRemote.wait` method to synchronously wait for the execution to complete:

.. code-block:: python

completed_execution = remote.wait(execution)
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved

You can also pass in ``wait=True`` to the :method:`~flytekit.remote.FlyteRemote.execute` method to synchronously wait
for the execution to complete before returning the execution object:
You can also pass in ``wait=True`` to the :meth:`~flytekit.remote.remote.FlyteRemote.execute` method.

.. code-block:: python

execution = remote.execute(flyte_entity, inputs={...}, wait=True)
completed_execution = remote.execute(flyte_entity, inputs={...}, wait=True)

********************
Syncing Remote State
********************

Use the :method:`~flytekit.remote.FlyteRemote.sync` method to sync the entity object's state with the remote state
Use the :meth:`~flytekit.remote.remote.FlyteRemote.sync` method to sync the entity object's state with the remote state

.. code-block:: python

Expand Down
4 changes: 2 additions & 2 deletions docs/source/design/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Flytekit is comprised of a handful of different logical components, each discuss

* :ref:`Models Files <design-models>` - These are almost Protobuf generated files.
* :ref:`Authoring <design-authoring>` - This provides the core Flyte authoring experiences, allowing users to write tasks, workflows, and launch plans.
* :ref:`Control Plane <design-control-plane>` - The code here allows users to interact with the control plane through Python objecs.
* :ref:`Control Plane <design-control-plane>` - The code here allows users to interact with the control plane through Python objects.
* :ref:`Execution <design-execution>` - A small shim layer basically that handles interaction with the Flyte ecosystem at execution time.
* :ref:`CLIs and Clients <design-clis>` - Command line tools users may find themselves interacting with and the control plane client the CLIs call.

Expand All @@ -19,6 +19,6 @@ Flytekit is comprised of a handful of different logical components, each discuss

models
authoring
control_plane
Control Plane <control_plane>
execution
clis
138 changes: 100 additions & 38 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ def from_config(
raw_output_data_prefix = raw_output_data_prefix if raw_output_data_prefix else None

return cls(
default_project=default_project or PROJECT.get(),
default_domain=default_domain or DOMAIN.get(),
flyte_admin_url=platform_config.URL.get(),
insecure=platform_config.INSECURE.get(),
default_project=default_project or PROJECT.get() or None,
default_domain=default_domain or DOMAIN.get() or None,
file_access=FileAccessProvider(
local_sandbox_dir=sdk_config.LOCAL_SANDBOX.get(),
remote_proxy={
Expand All @@ -142,10 +142,10 @@ def from_config(

def __init__(
self,
default_project: str,
default_domain: str,
flyte_admin_url: str,
insecure: bool,
default_project: typing.Optional[str] = None,
default_domain: typing.Optional[str] = None,
file_access: typing.Optional[FileAccessProvider] = None,
auth_role: typing.Optional[common_models.AuthRole] = None,
notifications: typing.Optional[typing.List[common_models.Notification]] = None,
Expand All @@ -156,10 +156,10 @@ def __init__(
):
"""Initilize a FlyteRemote object.

:param default_project: default project to use when fetching or executing flyte entities.
:param default_domain: default domain to use when fetching or executing flyte entities.
:param flyte_admin_url: url pointing to the remote backend.
:param insecure: whether or not the enable SSL.
:param default_project: default project to use when fetching or executing flyte entities.
:param default_domain: default domain to use when fetching or executing flyte entities.
:param file_access: file access provider to use for offloading non-literal inputs/outputs.
:param auth_role: auth role config
:param notifications: notification config
Expand Down Expand Up @@ -436,25 +436,6 @@ def _serialize(
# Register Entities #
#####################

def _resolve_identifier_kwargs(
self,
entity,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
version: typing.Optional[str],
):
"""
Resolves the identifier attributes based on user input, falling back on default project/domain and
auto-generated version.
"""
return {
"project": project or self.default_project,
"domain": domain or self.default_domain,
"name": name or entity.name,
"version": version or self.version,
}

@singledispatchmethod
def register(
self,
Expand Down Expand Up @@ -535,17 +516,68 @@ def _(
# Execute Entities #
####################

def _resolve_identifier_kwargs(
self,
entity,
project: typing.Optional[str],
domain: typing.Optional[str],
name: typing.Optional[str],
version: typing.Optional[str],
):
"""
Resolves the identifier attributes based on user input, falling back on the default project/domain and
auto-generated version, and ultimately the entity project/domain if entity is a remote flyte entity.
"""
error_msg = (
"entity {entity} of type {entity_type} is not associated with a {arg_name}. Please specify the {arg_name} "
"argument when invoking the FlyteRemote.execute method or a default_{arg_name} value when initializig the "
"FlyteRemote object."
)

if project:
resolved_project, msg_project = project, "method"
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
elif self.default_project:
resolved_project, msg_project = self.default_project, "remote"
elif hasattr(entity, "id"):
resolved_project, msg_project = entity.id.project, "entity"
else:
raise TypeError(error_msg.format(entity=entity, entity_type=type(entity), arg_name="project"))

if domain:
resolved_domain, msg_domain = domain, "execute-method"
elif self.default_domain:
resolved_domain, msg_domain = self.default_domain, "remote"
elif hasattr(entity, "id"):
resolved_domain, msg_domain = entity.id.domain, "entity"
else:
raise TypeError(error_msg.format(entity=entity, entity_type=type(entity), arg_name="project"))
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved

remote_logger.debug(
f"Using {msg_project}-supplied value for project and {msg_domain}-supplied value for domain."
)

return {
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
"project": resolved_project,
"domain": resolved_domain,
"name": name or entity.name,
"version": version or self.version,
}

def _execute(
self,
flyte_id: Identifier,
inputs: typing.Dict[str, typing.Any],
project: str,
domain: str,
execution_name: typing.Optional[str] = None,
wait: bool = False,
) -> FlyteWorkflowExecution:
"""Common method for execution across all entities.

:param flyte_id: entity identifier
:param inputs: dictionary mapping argument names to values
:param project: project on which to execute the entity referenced by flyte_id
:param domain: domain on which to execute the entity referenced by flyte_id
:param execution_name: name of the execution
:param wait: if True, waits for execution to complete
:returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution`
Expand All @@ -567,8 +599,8 @@ def _execute(
# in the case that I want to use a flyte entity from e.g. project "A" but actually execute the entity on a
# different project "B". For now, this method doesn't support this use case.
exec_id = self.client.create_execution(
flyte_id.project,
flyte_id.domain,
project,
domain,
execution_name,
ExecutionSpec(
flyte_id,
Expand Down Expand Up @@ -623,10 +655,10 @@ def execute(
:param execution_name: name of the execution. If None, uses auto-generated value.
:param wait: if True, waits for execution to complete

.. note::
.. note:

The ``project``, ``domain``, ``name`` and ``version`` arguments do not apply to ``FlyteTask``,
``FlyteLaunchPlan``, and ``FlyteWorkflow`` entity inputs.
The ``name`` and ``version`` arguments do not apply to ``FlyteTask``, ``FlyteLaunchPlan``, and
``FlyteWorkflow`` entity inputs. These values are determined by referencing the entity identifier values.
"""
raise NotImplementedError(f"entity type {type(entity)} not recognized for execution")

Expand All @@ -648,10 +680,17 @@ def _(
) -> FlyteWorkflowExecution:
"""Execute a FlyteTask, or FlyteLaunchplan.

NOTE: the project, domain, name, and version arguments are currently not used and only there consistency in
the function signature
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved
"""
return self._execute(entity.id, inputs, execution_name, wait)
flyte_id_kwargs = self._resolve_identifier_kwargs(entity, project, domain, entity.id.name, entity.id.version)
return self._execute(
entity.id,
inputs,
project=flyte_id_kwargs["project"],
domain=flyte_id_kwargs["domain"],
execution_name=execution_name,
wait=wait,
)

@execute.register
def _(
Expand All @@ -667,12 +706,14 @@ def _(
) -> FlyteWorkflowExecution:
"""Execute a FlyteWorkflow.

NOTE: the project, domain, name, and version arguments are currently not used and only there consistency in
the function signature
NOTE: the name and version arguments are currently not used and only there consistency in the function signature
"""
flyte_id_kwargs = self._resolve_identifier_kwargs(entity, project, domain, entity.id.name, entity.id.version)
return self.execute(
self.fetch_launch_plan(entity.id.project, entity.id.domain, entity.id.name, entity.id.version),
inputs,
project=flyte_id_kwargs["project"],
domain=flyte_id_kwargs["domain"],
execution_name=execution_name,
wait=wait,
)
Expand All @@ -698,7 +739,14 @@ def _(
flyte_task: FlyteTask = self.fetch_task(**flyte_id_kwargs)
except Exception:
flyte_task: FlyteTask = self.register(entity, **flyte_id_kwargs)
return self.execute(flyte_task, inputs, execution_name=execution_name, wait=wait)
return self.execute(
flyte_task,
inputs,
project=flyte_id_kwargs["project"],
domain=flyte_id_kwargs["domain"],
execution_name=execution_name,
wait=wait,
)

@execute.register
def _(
Expand All @@ -718,7 +766,14 @@ def _(
flyte_workflow: FlyteWorkflow = self.fetch_workflow(**flyte_id_kwargs)
except Exception:
flyte_workflow: FlyteWorkflow = self.register(entity, **flyte_id_kwargs)
return self.execute(flyte_workflow, inputs, execution_name=execution_name, wait=wait)
return self.execute(
flyte_workflow,
inputs,
project=flyte_id_kwargs["project"],
domain=flyte_id_kwargs["domain"],
execution_name=execution_name,
wait=wait,
)

@execute.register
def _(
Expand All @@ -738,7 +793,14 @@ def _(
flyte_launchplan: FlyteLaunchPlan = self.fetch_launch_plan(**flyte_id_kwargs)
except Exception:
flyte_launchplan: FlyteLaunchPlan = self.register(entity, **flyte_id_kwargs)
return self.execute(flyte_launchplan, inputs, execution_name=execution_name, wait=wait)
return self.execute(
flyte_launchplan,
inputs,
project=flyte_id_kwargs["project"],
domain=flyte_id_kwargs["domain"],
execution_name=execution_name,
wait=wait,
)

###################################
# Wait for Executions to Complete #
Expand Down