Skip to content

Commit

Permalink
Make MatchableResources configurable through flyte-cli (flyteorg#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Jun 4, 2020
1 parent 2a3aaf9 commit 6aa4aaa
Show file tree
Hide file tree
Showing 6 changed files with 399 additions and 2 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.9.0b0'
__version__ = '0.9.0b1'
47 changes: 46 additions & 1 deletion flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from flyteidl.admin import task_pb2 as _task_pb2, common_pb2 as _common_pb2, workflow_pb2 as _workflow_pb2, \
launch_plan_pb2 as _launch_plan_pb2, execution_pb2 as _execution_pb2, node_execution_pb2 as _node_execution_pb2, \
task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2
task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2, project_domain_attributes_pb2 as \
_project_domain_attributes_pb2, workflow_attributes_pb2 as _workflow_attributes_pb2
from flyteidl.core import identifier_pb2 as _identifier_pb2

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
Expand Down Expand Up @@ -885,3 +886,47 @@ def register_project(self, project):
project=project.to_flyte_idl(),
)
)

####################################################################################################################
#
# Matching Attributes Endpoints
#
####################################################################################################################

def update_project_domain_attributes(self, project, domain, matching_attributes):
"""
Sets custom attributes for a project and domain combination.
:param Text project:
:param Text domain:
:param flytekit.models.MatchingAttributes matching_attributes:
:return:
"""
super(SynchronousFlyteClient, self).update_project_domain_attributes(
_project_domain_attributes_pb2.ProjectDomainAttributesUpdateRequest(
attributes=_project_domain_attributes_pb2.ProjectDomainAttributes(
project=project,
domain=domain,
matching_attributes=matching_attributes.to_flyte_idl(),
)
)
)

def update_workflow_attributes(self, project, domain, workflow, matching_attributes):
"""
Sets custom attributes for a project, domain, and workflow combination.
:param Text project:
:param Text domain:
:param Text workflow:
:param flytekit.models.MatchingAttributes matching_attributes:
:return:
"""
super(SynchronousFlyteClient, self).update_workflow_attributes(
_workflow_attributes_pb2.WorkflowAttributesUpdateRequest(
attributes=_workflow_attributes_pb2.WorkflowAttributes(
project=project,
domain=domain,
workflow=workflow,
matching_attributes=matching_attributes.to_flyte_idl(),
)
)
)
24 changes: 24 additions & 0 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,30 @@ def register_project(self, project_register_request):
"""
return self._stub.RegisterProject(project_register_request, metadata=self._metadata)

####################################################################################################################
#
# Matching Attributes Endpoints
#
####################################################################################################################
@_handle_rpc_error
def update_project_domain_attributes(self, project_domain_attributes_update_request):
"""
This updates the attributes for a project and domain registered with the Flyte Admin Service
:param flyteidl.admin..ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request:
:rtype: flyteidl.admin..ProjectDomainAttributesUpdateResponse
"""
return self._stub.UpdateProjectDomainAttributes(project_domain_attributes_update_request,
metadata=self._metadata)

@_handle_rpc_error
def update_workflow_attributes(self, workflow_attributes_update_request):
"""
This updates the attributes for a project, domain, and workflow registered with the Flyte Admin Service
:param flyteidl.admin..UpdateWorkflowAttributes workflow_attributes_update_request:
:rtype: flyteidl.admin..workflow_attributes_update_requestResponse
"""
return self._stub.UpdateWorkflowAttributes(workflow_attributes_update_request, metadata=self._metadata)

####################################################################################################################
#
# Event Endpoints
Expand Down
105 changes: 105 additions & 0 deletions flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from flytekit.models.admin import common as _admin_common
from flytekit.models.core import execution as _core_execution_models, identifier as _core_identifier
from flytekit.models.execution import ExecutionSpec as _ExecutionSpec, ExecutionMetadata as _ExecutionMetadata
from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes,\
ExecutionQueueAttributes as _ExecutionQueueAttributes, ExecutionClusterLabel as _ExecutionClusterLabel,\
MatchingAttributes as _MatchingAttributes
from flytekit.models.project import Project as _Project
from flytekit.models.schedule import Schedule as _Schedule
from flytekit.common.exceptions import user as _user_exceptions
Expand Down Expand Up @@ -1719,6 +1722,108 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name):
_click.echo("Successfully updated launch plan")


@_flyte_cli.command('update-cluster-resource-attributes', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_project_option
@_domain_option
@_optional_name_option
@_click.option('--attributes', type=(str, str), multiple=True)
def update_cluster_resource_attributes(host, insecure, project, domain, name, attributes):
"""
Sets matchable cluster resource attributes for a project, domain and optionally, workflow name.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \
--attributes cpu 1 --attributes memory 500M
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
cluster_resource_attributes = _ClusterResourceAttributes({attribute[0]: attribute[1] for attribute in attributes})
matching_attributes = _MatchingAttributes(cluster_resource_attributes=cluster_resource_attributes)

if name is not None:
client.update_workflow_attributes(
project, domain, name, matching_attributes
)
_click.echo("Successfully updated cluster resource attributes for project: {}, domain: {}, and workflow: {}".
format(project, domain, name))
else:
client.update_project_domain_attributes(
project, domain, matching_attributes
)
_click.echo("Successfully updated cluster resource attributes for project: {} and domain: {}".
format(project, domain))


@_flyte_cli.command('update-execution-queue-attributes', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_project_option
@_domain_option
@_optional_name_option
@_click.option("--tags", multiple=True, help="Tag(s) to be applied.")
def update_execution_queue_attributes(host, insecure, project, domain, name, tags):
"""
Tags used for assigning execution queues for tasks belonging to a project, domain and optionally, workflow name.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development update-execution-queue-attributes \
--tags critical --tags gpu_intensive
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
execution_queue_attributes = _ExecutionQueueAttributes(list(tags))
matching_attributes = _MatchingAttributes(execution_queue_attributes=execution_queue_attributes)

if name is not None:
client.update_workflow_attributes(
project, domain, name, matching_attributes
)
_click.echo("Successfully updated execution queue attributes for project: {}, domain: {}, and workflow: {}".
format(project, domain, name))
else:
client.update_project_domain_attributes(
project, domain, matching_attributes
)
_click.echo("Successfully updated execution queue attributes for project: {} and domain: {}".
format(project, domain))


@_flyte_cli.command('update-execution-cluster-label', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_project_option
@_domain_option
@_optional_name_option
@_click.option("--value", help="Cluster label for which to schedule matching executions")
def update_execution_cluster_label(host, insecure, project, domain, name, value):
"""
Label value to determine where an execution's task will be run for tasks belonging to a project, domain and
optionally, workflow name.
e.g.
$ flyte-cli -h localhost:30081 -p flyteexamples -d development update-execution-cluster-label --value foo
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)
execution_cluster_label = _ExecutionClusterLabel(value)
matching_attributes = _MatchingAttributes(execution_cluster_label=execution_cluster_label)

if name is not None:
client.update_workflow_attributes(
project, domain, name, matching_attributes
)
_click.echo("Successfully updated execution cluster label for project: {}, domain: {}, and workflow: {}".
format(project, domain, name))
else:
client.update_project_domain_attributes(
project, domain, matching_attributes
)
_click.echo("Successfully updated execution cluster label for project: {} and domain: {}".
format(project, domain))


@_flyte_cli.command('setup-config', cls=_click.Command)
@_host_option
@_insecure_option
Expand Down
185 changes: 185 additions & 0 deletions flytekit/models/matchable_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from flyteidl.admin import matchable_resource_pb2 as _matchable_resource
from flytekit.models import common as _common


class ClusterResourceAttributes(_common.FlyteIdlEntity):

def __init__(self, attributes):
"""
Custom resource attributes which will be applied in cluster resource creation (e.g. quotas).
Dict keys are the *case-sensitive* names of variables in templatized resource files.
Dict values should be the custom values which get substituted during resource creation.
:param dict[Text, Text] attributes: Applied in cluster resource creation (e.g. quotas).
"""
self._attributes = attributes

@property
def attributes(self):
"""
Custom resource attributes which will be applied in cluster resource management
:rtype: dict[Text, Text]
"""
return self._attributes

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes
"""
return _matchable_resource.ClusterResourceAttributes(
attributes=self.attributes,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes pb2_object:
:rtype: ClusterResourceAttributes
"""
return cls(
attributes=pb2_object.attributes,
)


class ExecutionQueueAttributes(_common.FlyteIdlEntity):

def __init__(self, tags):
"""
Tags used for assigning execution queues for tasks matching a project, domain and optionally, workflow.
:param list[Text] tags:
"""
self._tags = tags

@property
def tags(self):
"""
:rtype: list[Text]
"""
return self._tags

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes
"""
return _matchable_resource.ExecutionQueueAttributes(
tags=self.tags,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes pb2_object:
:rtype: ExecutionQueueAttributes
"""
return cls(
tags=pb2_object.tags,
)


class ExecutionClusterLabel(_common.FlyteIdlEntity):

def __init__(self, value):
"""
Label value to determine where the execution will be run
:param Text value:
"""
self._value = value

@property
def value(self):
"""
:rtype: Text
"""
return self._value

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.ExecutionClusterLabel
"""
return _matchable_resource.ExecutionClusterLabel(
value=self.value,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.ExecutionClusterLabel pb2_object:
:rtype: ExecutionClusterLabel
"""
return cls(
value=pb2_object.value,
)


class MatchingAttributes(_common.FlyteIdlEntity):
def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None, execution_cluster_label=None):
"""
At most one target from cluster_resource_attributes, execution_queue_attributes or execution_cluster_label
can be set.
:param ClusterResourceAttributes cluster_resource_attributes:
:param ExecutionQueueAttributes execution_queue_attributes:
:param ExecutionClusterLabel execution_cluster_label:
"""
if cluster_resource_attributes:
if execution_queue_attributes or execution_cluster_label:
raise ValueError("Only one target can be set")
elif execution_queue_attributes and execution_cluster_label:
raise ValueError("Only one target can be set")

self._cluster_resource_attributes = cluster_resource_attributes
self._execution_queue_attributes = execution_queue_attributes
self._execution_cluster_label = execution_cluster_label

@property
def cluster_resource_attributes(self):
"""
Custom resource attributes which will be applied in cluster resource creation (e.g. quotas).
:rtype: ClusterResourceAttributes
"""
return self._cluster_resource_attributes

@property
def execution_queue_attributes(self):
"""
Tags used for assigning execution queues for tasks.
:rtype: ExecutionQueueAttributes
"""
return self._execution_queue_attributes

@property
def execution_cluster_label(self):
"""
Label value to determine where the execution will be run.
:rtype: ExecutionClusterLabel
"""
return self._execution_cluster_label

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.matchable_resource_pb2.MatchingAttributes
"""
return _matchable_resource.MatchingAttributes(
cluster_resource_attributes=self.cluster_resource_attributes.to_flyte_idl() if
self.cluster_resource_attributes else None,
execution_queue_attributes=self.execution_queue_attributes.to_flyte_idl() if self.execution_queue_attributes
else None,
execution_cluster_label=self.execution_cluster_label.to_flyte_idl() if self.execution_cluster_label
else None,
)

@classmethod
def from_flyte_idl(cls, pb2_object):
"""
:param flyteidl.admin.matchable_resource_pb2.MatchingAttributes pb2_object:
:rtype: MatchingAttributes
"""
return cls(
cluster_resource_attributes=ClusterResourceAttributes.from_flyte_idl(
pb2_object.cluster_resource_attributes) if pb2_object.HasField("cluster_resource_attributes") else None,
execution_queue_attributes=ExecutionQueueAttributes.from_flyte_idl(pb2_object.execution_queue_attributes) if
pb2_object.HasField("execution_queue_attributes") else None,
execution_cluster_label=ExecutionClusterLabel.from_flyte_idl(pb2_object.execution_cluster_label) if
pb2_object.HasField("execution_cluster_label") else None,
)
Loading

0 comments on commit 6aa4aaa

Please sign in to comment.