Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make MatchableResources configurable through flyte-cli #118

Merged
merged 5 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.9.0b0'
__version__ = '0.9.0b1'
47 changes: 46 additions & 1 deletion flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from flyteidl.admin import task_pb2 as _task_pb2, common_pb2 as _common_pb2, workflow_pb2 as _workflow_pb2, \
launch_plan_pb2 as _launch_plan_pb2, execution_pb2 as _execution_pb2, node_execution_pb2 as _node_execution_pb2, \
task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2
task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2, project_domain_attributes_pb2 as \
_project_domain_attributes_pb2, workflow_attributes_pb2 as _workflow_attributes_pb2
from flyteidl.core import identifier_pb2 as _identifier_pb2

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
Expand Down Expand Up @@ -885,3 +886,47 @@ def register_project(self, project):
project=project.to_flyte_idl(),
)
)

####################################################################################################################
#
# Matching Attributes Endpoints
#
####################################################################################################################

def update_project_domain_attributes(self, project, domain, matching_attributes):
"""
Sets custom attributes for a project and domain combination.
:param Text project:
:param Text domain:
:param flytekit.models.MatchingAttributes matching_attributes:
:return:
"""
super(SynchronousFlyteClient, self).update_project_domain_attributes(
_project_domain_attributes_pb2.ProjectDomainAttributesUpdateRequest(
attributes=_project_domain_attributes_pb2.ProjectDomainAttributes(
project=project,
domain=domain,
matching_attributes=matching_attributes.to_flyte_idl(),
)
)
)

def update_workflow_attributes(self, project, domain, workflow, matching_attributes):
"""
Sets custom attributes for a project, domain, and workflow combination.
:param Text project:
:param Text domain:
:param Text workflow:
:param flytekit.models.MatchingAttributes matching_attributes:
:return:
"""
super(SynchronousFlyteClient, self).update_workflow_attributes(
_workflow_attributes_pb2.WorkflowAttributesUpdateRequest(
attributes=_workflow_attributes_pb2.WorkflowAttributes(
project=project,
domain=domain,
workflow=workflow,
matching_attributes=matching_attributes.to_flyte_idl(),
)
)
)
24 changes: 24 additions & 0 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,30 @@ def register_project(self, project_register_request):
"""
return self._stub.RegisterProject(project_register_request, metadata=self._metadata)

####################################################################################################################
#
# Matching Attributes Endpoints
#
####################################################################################################################
@_handle_rpc_error
def update_project_domain_attributes(self, project_domain_attributes_update_request):
"""
This updates the attributes for a project and domain registered with the Flyte Admin Service
:param flyteidl.admin..ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request:
:rtype: flyteidl.admin..ProjectDomainAttributesUpdateResponse
"""
return self._stub.UpdateProjectDomainAttributes(project_domain_attributes_update_request,
metadata=self._metadata)

@_handle_rpc_error
def update_workflow_attributes(self, workflow_attributes_update_request):
"""
This updates the attributes for a project, domain, and workflow registered with the Flyte Admin Service
:param flyteidl.admin..UpdateWorkflowAttributes workflow_attributes_update_request:
:rtype: flyteidl.admin..workflow_attributes_update_requestResponse
"""
return self._stub.UpdateWorkflowAttributes(workflow_attributes_update_request, metadata=self._metadata)

####################################################################################################################
#
# Event Endpoints
Expand Down
72 changes: 72 additions & 0 deletions flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from flytekit.models.admin import common as _admin_common
from flytekit.models.core import execution as _core_execution_models, identifier as _core_identifier
from flytekit.models.execution import ExecutionSpec as _ExecutionSpec, ExecutionMetadata as _ExecutionMetadata
from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes,\
ExecutionQueueAttributes as _ExecutionQueueAttributes, MatchingAttributes as _MatchingAttributes
from flytekit.models.project import Project as _Project
from flytekit.models.schedule import Schedule as _Schedule
from flytekit.common.exceptions import user as _user_exceptions
Expand Down Expand Up @@ -1719,6 +1721,76 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name):
_click.echo("Successfully updated launch plan")


@_flyte_cli.command('update-cluster-resource-attributes', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_project_option
@_domain_option
@_optional_name_option
@_click.option('--attributes', type=(str, str), multiple=True)
def update_cluster_resource_attributes(host, insecure, project, domain, name, attributes):
"""
Sets matchable cluster resource attributes for a project, domain and optionally, workflow name.

Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \
--attributes cpu 1 --attributes memory 500M
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
cluster_resource_attributes = _ClusterResourceAttributes({attribute[0]: attribute[1] for attribute in attributes})
matching_attributes = _MatchingAttributes(cluster_resource_attributes=cluster_resource_attributes)

if name is not None:
client.update_workflow_attributes(
project, domain, name, matching_attributes
)
_click.echo("Successfully updated cluster resource attributes for project: {}, domain: {}, and workflow: {}".
format(project, domain, name))
else:
client.update_project_domain_attributes(
project, domain, matching_attributes
)
_click.echo("Successfully updated cluster resource attributes for project: {} and domain: {}".
format(project, domain))


@_flyte_cli.command('update-execution-queue-attributes', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_project_option
@_domain_option
@_optional_name_option
@_click.option("--tags", multiple=True, help="Tag(s) to be applied.")
def update_execution_queue_attributes(host, insecure, project, domain, name, tags):
"""
Tags used for assigning execution queues for tasks belonging to a project, domain and optionally, workflow name.

Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy paste?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep thanks for catching

--tags critical --tags gpu_intensive
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
execution_queue_attributes = _ExecutionQueueAttributes(list(tags))
matching_attributes = _MatchingAttributes(execution_queue_attributes=execution_queue_attributes)

if name is not None:
client.update_workflow_attributes(
project, domain, name, matching_attributes
)
_click.echo("Successfully updated execution queue attributes for project: {}, domain: {}, and workflow: {}".
format(project, domain, name))
else:
client.update_project_domain_attributes(
project, domain, matching_attributes
)
_click.echo("Successfully updated execution queue attributes for project: {} and domain: {}".
format(project, domain))


@_flyte_cli.command('setup-config', cls=_click.Command)
@_host_option
@_insecure_option
Expand Down
130 changes: 130 additions & 0 deletions flytekit/models/matchable_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from flyteidl.admin import matchable_resource_pb2 as _matchable_resource
from flytekit.models import common as _common


class ClusterResourceAttributes(_common.FlyteIdlEntity):

def __init__(self, attributes):
"""
Custom resource attributes which will be applied in cluster resource creation (e.g. quotas).
Dict keys are the *case-sensitive* names of variables in templatized resource files.
Dict values should be the custom values which get substituted during resource creation.

:param dict[Text, Text] attributes: Applied in cluster resource creation (e.g. quotas).
"""
self._attributes = attributes

@property
def attributes(self):
"""
Custom resource attributes which will be applied in cluster resource management
:rtype: dict[Text, Text]
"""
return self._attributes

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes
"""
return _matchable_resource.ClusterResourceAttributes(
attributes=self.attributes,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes pb2_object:
:rtype: ClusterResourceAttributes
"""
return cls(
attributes=pb2_object.attributes,
)


class ExecutionQueueAttributes(_common.FlyteIdlEntity):

def __init__(self, tags):
"""
Tags used for assigning execution queues for tasks matching a project, domain and optionally, workflow.

:param list[Text] tags:
"""
self._tags = tags

@property
def tags(self):
"""
:rtype: list[Text]
"""
return self._tags

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes
"""
return _matchable_resource.ExecutionQueueAttributes(
tags=self.tags,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes pb2_object:
:rtype: ExecutionQueueAttributes
"""
return cls(
tags=pb2_object.tags,
)


class MatchingAttributes(_common.FlyteIdlEntity):
def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None):
"""
At most one target from cluster_resource_attributes or execution_queue_attributes can be set.
:param ClusterResourceAttributes cluster_resource_attributes:
:param ExecutionQueueAttributes execution_queue_attributes:
"""
if cluster_resource_attributes and execution_queue_attributes:
raise ValueError("Only one of cluster_resource_attributes or execution_queue_attributes can be set")
self._cluster_resource_attributes = cluster_resource_attributes
self._execution_queue_attributes = execution_queue_attributes

@property
def cluster_resource_attributes(self):
"""
Custom resource attributes which will be applied in cluster resource creation (e.g. quotas).
:rtype: ClusterResourceAttributes
"""
return self._cluster_resource_attributes

@property
def execution_queue_attributes(self):
"""
Tags used for assigning execution queues for tasks.
:rtype: ExecutionQueueAttributes
"""
return self._execution_queue_attributes

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.MatchingAttributes
"""
return _matchable_resource.MatchingAttributes(
cluster_resource_attributes=self.cluster_resource_attributes.to_flyte_idl() if
self.cluster_resource_attributes else None,
execution_queue_attributes=self.execution_queue_attributes.to_flyte_idl() if self.execution_queue_attributes
else None,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.MatchingAttributes pb2_object:
:rtype: MatchingAttributes
"""
return cls(
cluster_resource_attributes=ClusterResourceAttributes.from_flyte_idl(
pb2_object.cluster_resource_attributes) if pb2_object.HasField("cluster_resource_attributes") else None,
execution_queue_attributes=ExecutionQueueAttributes.from_flyte_idl(pb2_object.execution_queue_attributes) if
pb2_object.HasField("execution_queue_attributes") else None,
)
27 changes: 27 additions & 0 deletions tests/flytekit/unit/models/test_matchable_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import absolute_import

from flytekit.models import matchable_resource


def test_cluster_resource_attributes():
obj = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"})
assert obj.attributes == {"cpu": "one million", "gpu": "just one"}
assert obj == matchable_resource.ClusterResourceAttributes.from_flyte_idl(obj.to_flyte_idl())


def test_execution_queue_attributes():
obj = matchable_resource.ExecutionQueueAttributes(["foo", "bar", "baz"])
assert obj.tags == ["foo", "bar", "baz"]
assert obj == matchable_resource.ExecutionQueueAttributes.from_flyte_idl(obj.to_flyte_idl())


def test_matchable_resource():
cluster_resource_attrs = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"})
obj = matchable_resource.MatchingAttributes(cluster_resource_attributes=cluster_resource_attrs)
assert obj.cluster_resource_attributes == cluster_resource_attrs
assert obj == matchable_resource.MatchingAttributes.from_flyte_idl(obj.to_flyte_idl())

execution_queue_attributes = matchable_resource.ExecutionQueueAttributes(["foo", "bar", "baz"])
obj2 = matchable_resource.MatchingAttributes(execution_queue_attributes=execution_queue_attributes)
assert obj2.execution_queue_attributes == execution_queue_attributes
assert obj2 == matchable_resource.MatchingAttributes.from_flyte_idl(obj2.to_flyte_idl())