From 2c98f9850ea60fa918af5acdb36a5d6a6a17b8dc Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 1 Jun 2020 16:31:57 -0700 Subject: [PATCH 1/4] Matchable Resources --- flytekit/clients/friendly.py | 47 +++++- flytekit/clients/raw.py | 24 +++ flytekit/clis/flyte_cli/main.py | 72 ++++++++ flytekit/models/matchable_resource.py | 159 ++++++++++++++++++ .../unit/models/test_matchable_resource.py | 27 +++ 5 files changed, 328 insertions(+), 1 deletion(-) create mode 100644 flytekit/models/matchable_resource.py create mode 100644 tests/flytekit/unit/models/test_matchable_resource.py diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 7b32109fb6..329be5991e 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -4,7 +4,8 @@ from flyteidl.admin import task_pb2 as _task_pb2, common_pb2 as _common_pb2, workflow_pb2 as _workflow_pb2, \ launch_plan_pb2 as _launch_plan_pb2, execution_pb2 as _execution_pb2, node_execution_pb2 as _node_execution_pb2, \ - task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2 + task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2, project_domain_attributes_pb2 as \ + _project_domain_attributes_pb2, workflow_attributes_pb2 as _workflow_attributes_pb2 from flyteidl.core import identifier_pb2 as _identifier_pb2 from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient @@ -885,3 +886,47 @@ def register_project(self, project): project=project.to_flyte_idl(), ) ) + + #################################################################################################################### + # + # Matching Attributes Endpoints + # + #################################################################################################################### + + def update_project_domain_attributes(self, project, domain, matching_attributes): + """ + Sets custom attributes for a project and domain combination. + :param Text project: + :param Text domain: + :param flytekit.models.MatchingAttributes matching_attributes: + :return: + """ + super(SynchronousFlyteClient, self).update_project_domain_attributes( + _project_domain_attributes_pb2.ProjectDomainAttributesUpdateRequest( + attributes=_project_domain_attributes_pb2.ProjectDomainAttributes( + project=project, + domain=domain, + matching_attributes=matching_attributes.to_flyte_idl(), + ) + ) + ) + + def update_workflow_attributes(self, project, domain, workflow, matching_attributes): + """ + Sets custom attributes for a project, domain, and workflow combination. + :param Text project: + :param Text domain: + :param Text workflow: + :param flytekit.models.MatchingAttributes matching_attributes: + :return: + """ + super(SynchronousFlyteClient, self).update_workflow_attributes( + _workflow_attributes_pb2.WorkflowAttributesUpdateRequest( + attributes=_workflow_attributes_pb2.WorkflowAttributes( + project=project, + domain=domain, + workflow=workflow, + matching_attributes=matching_attributes.to_flyte_idl(), + ) + ) + ) diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 71bf249126..3f31a0e9e9 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -571,6 +571,30 @@ def register_project(self, project_register_request): """ return self._stub.RegisterProject(project_register_request, metadata=self._metadata) + #################################################################################################################### + # + # Matching Attributes Endpoints + # + #################################################################################################################### + @_handle_rpc_error + def update_project_domain_attributes(self, project_domain_attributes_update_request): + """ + This updates the attributes for a project and domain registered with the Flyte Admin Service + :param flyteidl.admin..ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request: + :rtype: flyteidl.admin..ProjectDomainAttributesUpdateResponse + """ + return self._stub.UpdateProjectDomainAttributes(project_domain_attributes_update_request, + metadata=self._metadata) + + @_handle_rpc_error + def update_workflow_attributes(self, workflow_attributes_update_request): + """ + This updates the attributes for a project, domain, and workflow registered with the Flyte Admin Service + :param flyteidl.admin..UpdateWorkflowAttributes workflow_attributes_update_request: + :rtype: flyteidl.admin..workflow_attributes_update_requestResponse + """ + return self._stub.UpdateWorkflowAttributes(workflow_attributes_update_request, metadata=self._metadata) + #################################################################################################################### # # Event Endpoints diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index 2c573ab960..0bceb6d821 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -28,6 +28,8 @@ from flytekit.models.admin import common as _admin_common from flytekit.models.core import execution as _core_execution_models, identifier as _core_identifier from flytekit.models.execution import ExecutionSpec as _ExecutionSpec, ExecutionMetadata as _ExecutionMetadata +from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes,\ + ExecutionQueueAttributes as _ExecutionQueueAttributes, MatchingAttributes as _MatchingAttributes from flytekit.models.project import Project as _Project from flytekit.models.schedule import Schedule as _Schedule from flytekit.common.exceptions import user as _user_exceptions @@ -1675,6 +1677,76 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name): _click.echo("Successfully updated launch plan") +@_flyte_cli.command('update-cluster-resource-attributes', cls=_FlyteSubCommand) +@_host_option +@_insecure_option +@_project_option +@_domain_option +@_optional_name_option +@_click.argument('attributes', nargs=-1, type=_click.UNPROCESSED) +def update_cluster_resource_attributes(host, insecure, project, domain, name, attributes): + """ + Sets matchable cluster resource attributes for a project, domain and optionally, workflow name. + + Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration. + e.g. + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes -- cpu=1 \ + memory=500M + """ + _welcome_message() + client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) + cluster_resource_attributes = _ClusterResourceAttributes(_parse_args_into_dict(attributes)) + matching_attributes = _MatchingAttributes(cluster_resource_attributes=cluster_resource_attributes) + + if name is not None: + client.update_workflow_attributes( + project, domain, name, matching_attributes + ) + _click.echo("Successfully updated cluster resource attributes for project: {}, domain: {}, and workflow: {}". + format(project, domain, name)) + else: + client.update_project_domain_attributes( + project, domain, matching_attributes + ) + _click.echo("Successfully updated cluster resource attributes for project: {} and domain: {}". + format(project, domain)) + + +@_flyte_cli.command('update-execution-queue-attributes', cls=_FlyteSubCommand) +@_host_option +@_insecure_option +@_project_option +@_domain_option +@_optional_name_option +@_click.argument('tags', nargs=-1, type=_click.UNPROCESSED) +def update_execution_queue_attributes(host, insecure, project, domain, name, tags): + """ + Tags used for assigning execution queues for tasks belonging to a project, domain and optionally, workflow name. + + Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration. + e.g. + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes -- critical \ + production gpu_required + """ + _welcome_message() + client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) + execution_queue_attributes = _ExecutionQueueAttributes(list(tags)) + matching_attributes = _MatchingAttributes(execution_queue_attributes=execution_queue_attributes) + + if name is not None: + client.update_workflow_attributes( + project, domain, name, matching_attributes + ) + _click.echo("Successfully updated cluster resource attributes for project: {}, domain: {}, and workflow: {}". + format(project, domain, name)) + else: + client.update_project_domain_attributes( + project, domain, matching_attributes + ) + _click.echo("Successfully updated cluster resource attributes for project: {} and domain: {}". + format(project, domain)) + + @_flyte_cli.command('setup-config', cls=_click.Command) @_host_option @_insecure_option diff --git a/flytekit/models/matchable_resource.py b/flytekit/models/matchable_resource.py new file mode 100644 index 0000000000..6075b8e46e --- /dev/null +++ b/flytekit/models/matchable_resource.py @@ -0,0 +1,159 @@ +from flyteidl.admin import matchable_resource_pb2 as _matchable_resource +from flytekit.models import common as _common + + +class MatchableResource(object): + # Applies to customizable task resource requests and limits. + TASK_RESOURCE = _matchable_resource.TASK_RESOURCE + # Applies to configuring templated kubernetes cluster resources. + CLUSTER_RESOURCE = _matchable_resource.CLUSTER_RESOURCE + # Configures task and dynamic task execution queue assignment. + EXECUTION_QUEUE = _matchable_resource.EXECUTION_QUEUE + # Configures the K8s cluster label to be used for execution to be run + EXECUTION_CLUSTER_LABEL = _matchable_resource.EXECUTION_CLUSTER_LABEL + + @classmethod + def enum_to_string(cls, val): + """ + :param int val: + :rtype: Text + """ + if val == cls.TASK_RESOURCE: + return "TASK_RESOURCE" + elif val == cls.CLUSTER_RESOURCE: + return "CLUSTER_RESOURCE" + elif val == cls.EXECUTION_QUEUE: + return "EXECUTION_QUEUE" + elif val == cls.EXECUTION_CLUSTER_LABEL: + return "EXECUTION_CLUSTER_LABEL" + else: + return "" + + +class ClusterResourceAttributes(_common.FlyteIdlEntity): + + def __init__(self, attributes): + """ + Custom resource attributes which will be applied in cluster resource creation (e.g. quotas). + Dict keys are the *case-sensitive* names of variables in templatized resource files. + Dict values should be the custom values which get substituted during resource creation. + + :param dict[Text, Text] attributes: Applied in cluster resource creation (e.g. quotas). + """ + self._attributes = attributes + + @property + def attributes(self): + """ + Custom resource attributes which will be applied in cluster resource management + :rtype: dict[Text, Text] + """ + return self._attributes + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes + """ + return _matchable_resource.ClusterResourceAttributes( + attributes=self.attributes, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes pb2_object: + :rtype: ClusterResourceAttributes + """ + return cls( + attributes=pb2_object.attributes, + ) + + +class ExecutionQueueAttributes(_common.FlyteIdlEntity): + + def __init__(self, tags): + """ + Tags used for assigning execution queues for tasks matching a project, domain and optionally, workflow. + + :param list[Text] tags: + """ + self._tags = tags + + @property + def tags(self): + """ + :rtype: list[Text] + """ + return self._tags + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes + """ + return _matchable_resource.ExecutionQueueAttributes( + tags=self.tags, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes pb2_object: + :rtype: ExecutionQueueAttributes + """ + return cls( + tags=pb2_object.tags, + ) + + +class MatchingAttributes(_common.FlyteIdlEntity): + def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None): + """ + At most one target from task_resource_attributes, cluster_resource_attributes, execution_queue_attributes or + execution_cluster_label can be set. + :param ClusterResourceAttributes cluster_resource_attributes: + :param ExecutionQueueAttributes execution_queue_attributes: + """ + if cluster_resource_attributes and execution_queue_attributes: + raise ValueError("Only one of cluster_resource_attributes or execution_queue_attributes can be set") + self._cluster_resource_attributes = cluster_resource_attributes + self._execution_queue_attributes = execution_queue_attributes + + @property + def cluster_resource_attributes(self): + """ + Custom resource attributes which will be applied in cluster resource creation (e.g. quotas). + :rtype: ClusterResourceAttributes + """ + return self._cluster_resource_attributes + + @property + def execution_queue_attributes(self): + """ + Tags used for assigning execution queues for tasks. + :rtype: ExecutionQueueAttributes + """ + return self._execution_queue_attributes + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.MatchingAttributes + """ + return _matchable_resource.MatchingAttributes( + cluster_resource_attributes=self.cluster_resource_attributes.to_flyte_idl() if + self.cluster_resource_attributes else None, + execution_queue_attributes=self.execution_queue_attributes.to_flyte_idl() if self.execution_queue_attributes + else None, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.MatchingAttributes pb2_object: + :rtype: MatchingAttributes + """ + return cls( + cluster_resource_attributes=ClusterResourceAttributes.from_flyte_idl( + pb2_object.cluster_resource_attributes) if pb2_object.HasField("cluster_resource_attributes") else None, + execution_queue_attributes=ExecutionQueueAttributes.from_flyte_idl(pb2_object.execution_queue_attributes) if + pb2_object.HasField("execution_queue_attributes") else None, + ) diff --git a/tests/flytekit/unit/models/test_matchable_resource.py b/tests/flytekit/unit/models/test_matchable_resource.py new file mode 100644 index 0000000000..684caf7b6a --- /dev/null +++ b/tests/flytekit/unit/models/test_matchable_resource.py @@ -0,0 +1,27 @@ +from __future__ import absolute_import + +from flytekit.models import matchable_resource + + +def test_cluster_resource_attributes(): + obj = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"}) + assert obj.attributes == {"cpu": "one million", "gpu": "just one"} + assert obj == matchable_resource.ClusterResourceAttributes.from_flyte_idl(obj.to_flyte_idl()) + + +def test_execution_queue_attributes(): + obj = matchable_resource.ExecutionQueueAttributes(["foo", "bar", "baz"]) + assert obj.tags == ["foo", "bar", "baz"] + assert obj == matchable_resource.ExecutionQueueAttributes.from_flyte_idl(obj.to_flyte_idl()) + + +def test_matchable_resource(): + cluster_resource_attrs = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"}) + obj = matchable_resource.MatchingAttributes(cluster_resource_attributes=cluster_resource_attrs) + assert obj.cluster_resource_attributes == cluster_resource_attrs + assert obj == matchable_resource.MatchingAttributes.from_flyte_idl(obj.to_flyte_idl()) + + execution_queue_attributes = matchable_resource.ExecutionQueueAttributes(["foo", "bar", "baz"]) + obj2 = matchable_resource.MatchingAttributes(execution_queue_attributes=execution_queue_attributes) + assert obj2.execution_queue_attributes == execution_queue_attributes + assert obj2 == matchable_resource.MatchingAttributes.from_flyte_idl(obj2.to_flyte_idl()) From efbfeabf4d12a14b66b0673163f8dd02bdfecda9 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 1 Jun 2020 16:38:02 -0700 Subject: [PATCH 2/4] clean up --- flytekit/__init__.py | 2 +- flytekit/clis/flyte_cli/main.py | 4 ++-- flytekit/models/matchable_resource.py | 31 +-------------------------- 3 files changed, 4 insertions(+), 33 deletions(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 875147396a..409eb5113f 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins -__version__ = '0.8.1' +__version__ = '0.8.2' diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index 0bceb6d821..39a0ec0c81 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -1737,13 +1737,13 @@ def update_execution_queue_attributes(host, insecure, project, domain, name, tag client.update_workflow_attributes( project, domain, name, matching_attributes ) - _click.echo("Successfully updated cluster resource attributes for project: {}, domain: {}, and workflow: {}". + _click.echo("Successfully updated execution queue attributes for project: {}, domain: {}, and workflow: {}". format(project, domain, name)) else: client.update_project_domain_attributes( project, domain, matching_attributes ) - _click.echo("Successfully updated cluster resource attributes for project: {} and domain: {}". + _click.echo("Successfully updated execution queue attributes for project: {} and domain: {}". format(project, domain)) diff --git a/flytekit/models/matchable_resource.py b/flytekit/models/matchable_resource.py index 6075b8e46e..9434885939 100644 --- a/flytekit/models/matchable_resource.py +++ b/flytekit/models/matchable_resource.py @@ -2,34 +2,6 @@ from flytekit.models import common as _common -class MatchableResource(object): - # Applies to customizable task resource requests and limits. - TASK_RESOURCE = _matchable_resource.TASK_RESOURCE - # Applies to configuring templated kubernetes cluster resources. - CLUSTER_RESOURCE = _matchable_resource.CLUSTER_RESOURCE - # Configures task and dynamic task execution queue assignment. - EXECUTION_QUEUE = _matchable_resource.EXECUTION_QUEUE - # Configures the K8s cluster label to be used for execution to be run - EXECUTION_CLUSTER_LABEL = _matchable_resource.EXECUTION_CLUSTER_LABEL - - @classmethod - def enum_to_string(cls, val): - """ - :param int val: - :rtype: Text - """ - if val == cls.TASK_RESOURCE: - return "TASK_RESOURCE" - elif val == cls.CLUSTER_RESOURCE: - return "CLUSTER_RESOURCE" - elif val == cls.EXECUTION_QUEUE: - return "EXECUTION_QUEUE" - elif val == cls.EXECUTION_CLUSTER_LABEL: - return "EXECUTION_CLUSTER_LABEL" - else: - return "" - - class ClusterResourceAttributes(_common.FlyteIdlEntity): def __init__(self, attributes): @@ -108,8 +80,7 @@ def from_flyte_idl(cls, pb2_object): class MatchingAttributes(_common.FlyteIdlEntity): def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None): """ - At most one target from task_resource_attributes, cluster_resource_attributes, execution_queue_attributes or - execution_cluster_label can be set. + At most one target from cluster_resource_attributes or execution_queue_attributes can be set. :param ClusterResourceAttributes cluster_resource_attributes: :param ExecutionQueueAttributes execution_queue_attributes: """ From 1d734faf6dd29fb7a085ebd4852d98f9b3f4f515 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Mon, 1 Jun 2020 17:21:55 -0700 Subject: [PATCH 3/4] review comments --- flytekit/clis/flyte_cli/main.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index 39a0ec0c81..189e664a93 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -1683,19 +1683,19 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name): @_project_option @_domain_option @_optional_name_option -@_click.argument('attributes', nargs=-1, type=_click.UNPROCESSED) +@_click.option('--attributes', type=(str, str), multiple=True) def update_cluster_resource_attributes(host, insecure, project, domain, name, attributes): """ Sets matchable cluster resource attributes for a project, domain and optionally, workflow name. Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration. e.g. - $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes -- cpu=1 \ - memory=500M + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \ + --attributes cpu 1 --attributes memory 500M """ _welcome_message() client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) - cluster_resource_attributes = _ClusterResourceAttributes(_parse_args_into_dict(attributes)) + cluster_resource_attributes = _ClusterResourceAttributes({attribute[0]: attribute[1] for attribute in attributes}) matching_attributes = _MatchingAttributes(cluster_resource_attributes=cluster_resource_attributes) if name is not None: @@ -1718,15 +1718,15 @@ def update_cluster_resource_attributes(host, insecure, project, domain, name, at @_project_option @_domain_option @_optional_name_option -@_click.argument('tags', nargs=-1, type=_click.UNPROCESSED) +@_click.option("--tags", multiple=True, help="Tag(s) to be applied.") def update_execution_queue_attributes(host, insecure, project, domain, name, tags): """ Tags used for assigning execution queues for tasks belonging to a project, domain and optionally, workflow name. Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration. e.g. - $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes -- critical \ - production gpu_required + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \ + --tags critical --tags gpu_intensive """ _welcome_message() client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) From 18b6e4bacdde7f28aff3680a86ac3097b39e97d4 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 4 Jun 2020 16:40:31 -0700 Subject: [PATCH 4/4] Add cluster execution label too --- flytekit/clis/flyte_cli/main.py | 41 ++++++++++-- flytekit/models/matchable_resource.py | 63 +++++++++++++++++-- .../unit/models/test_matchable_resource.py | 11 ++++ 3 files changed, 107 insertions(+), 8 deletions(-) diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index c6c780d089..e904eecccc 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -30,7 +30,8 @@ from flytekit.models.core import execution as _core_execution_models, identifier as _core_identifier from flytekit.models.execution import ExecutionSpec as _ExecutionSpec, ExecutionMetadata as _ExecutionMetadata from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes,\ - ExecutionQueueAttributes as _ExecutionQueueAttributes, MatchingAttributes as _MatchingAttributes + ExecutionQueueAttributes as _ExecutionQueueAttributes, ExecutionClusterLabel as _ExecutionClusterLabel,\ + MatchingAttributes as _MatchingAttributes from flytekit.models.project import Project as _Project from flytekit.models.schedule import Schedule as _Schedule from flytekit.common.exceptions import user as _user_exceptions @@ -1732,7 +1733,6 @@ def update_cluster_resource_attributes(host, insecure, project, domain, name, at """ Sets matchable cluster resource attributes for a project, domain and optionally, workflow name. - Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration. e.g. $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \ --attributes cpu 1 --attributes memory 500M @@ -1767,9 +1767,8 @@ def update_execution_queue_attributes(host, insecure, project, domain, name, tag """ Tags used for assigning execution queues for tasks belonging to a project, domain and optionally, workflow name. - Use a -- to separate arguments to this cli, and attributes used for cluster resource configuration. e.g. - $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \ + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-execution-queue-attributes \ --tags critical --tags gpu_intensive """ _welcome_message() @@ -1791,6 +1790,40 @@ def update_execution_queue_attributes(host, insecure, project, domain, name, tag format(project, domain)) +@_flyte_cli.command('update-execution-cluster-label', cls=_FlyteSubCommand) +@_host_option +@_insecure_option +@_project_option +@_domain_option +@_optional_name_option +@_click.option("--value", help="Cluster label for which to schedule matching executions") +def update_execution_cluster_label(host, insecure, project, domain, name, value): + """ + Label value to determine where an execution's task will be run for tasks belonging to a project, domain and + optionally, workflow name. + + e.g. + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-execution-cluster-label --value foo + """ + _welcome_message() + client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) + execution_cluster_label = _ExecutionClusterLabel(value) + matching_attributes = _MatchingAttributes(execution_cluster_label=execution_cluster_label) + + if name is not None: + client.update_workflow_attributes( + project, domain, name, matching_attributes + ) + _click.echo("Successfully updated execution cluster label for project: {}, domain: {}, and workflow: {}". + format(project, domain, name)) + else: + client.update_project_domain_attributes( + project, domain, matching_attributes + ) + _click.echo("Successfully updated execution cluster label for project: {} and domain: {}". + format(project, domain)) + + @_flyte_cli.command('setup-config', cls=_click.Command) @_host_option @_insecure_option diff --git a/flytekit/models/matchable_resource.py b/flytekit/models/matchable_resource.py index 9434885939..46171b0c19 100644 --- a/flytekit/models/matchable_resource.py +++ b/flytekit/models/matchable_resource.py @@ -77,17 +77,60 @@ def from_flyte_idl(cls, pb2_object): ) +class ExecutionClusterLabel(_common.FlyteIdlEntity): + + def __init__(self, value): + """ + Label value to determine where the execution will be run + + :param Text value: + """ + self._value = value + + @property + def value(self): + """ + :rtype: Text + """ + return self._value + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.ExecutionClusterLabel + """ + return _matchable_resource.ExecutionClusterLabel( + value=self.value, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.ExecutionClusterLabel pb2_object: + :rtype: ExecutionClusterLabel + """ + return cls( + value=pb2_object.value, + ) + + class MatchingAttributes(_common.FlyteIdlEntity): - def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None): + def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None, execution_cluster_label=None): """ - At most one target from cluster_resource_attributes or execution_queue_attributes can be set. + At most one target from cluster_resource_attributes, execution_queue_attributes or execution_cluster_label + can be set. :param ClusterResourceAttributes cluster_resource_attributes: :param ExecutionQueueAttributes execution_queue_attributes: + :param ExecutionClusterLabel execution_cluster_label: """ - if cluster_resource_attributes and execution_queue_attributes: - raise ValueError("Only one of cluster_resource_attributes or execution_queue_attributes can be set") + if cluster_resource_attributes: + if execution_queue_attributes or execution_cluster_label: + raise ValueError("Only one target can be set") + elif execution_queue_attributes and execution_cluster_label: + raise ValueError("Only one target can be set") + self._cluster_resource_attributes = cluster_resource_attributes self._execution_queue_attributes = execution_queue_attributes + self._execution_cluster_label = execution_cluster_label @property def cluster_resource_attributes(self): @@ -105,6 +148,14 @@ def execution_queue_attributes(self): """ return self._execution_queue_attributes + @property + def execution_cluster_label(self): + """ + Label value to determine where the execution will be run. + :rtype: ExecutionClusterLabel + """ + return self._execution_cluster_label + def to_flyte_idl(self): """ :rtype: flyteidl.admin.matchable_resource_pb2.MatchingAttributes @@ -114,6 +165,8 @@ def to_flyte_idl(self): self.cluster_resource_attributes else None, execution_queue_attributes=self.execution_queue_attributes.to_flyte_idl() if self.execution_queue_attributes else None, + execution_cluster_label=self.execution_cluster_label.to_flyte_idl() if self.execution_cluster_label + else None, ) @classmethod @@ -127,4 +180,6 @@ def from_flyte_idl(cls, pb2_object): pb2_object.cluster_resource_attributes) if pb2_object.HasField("cluster_resource_attributes") else None, execution_queue_attributes=ExecutionQueueAttributes.from_flyte_idl(pb2_object.execution_queue_attributes) if pb2_object.HasField("execution_queue_attributes") else None, + execution_cluster_label=ExecutionClusterLabel.from_flyte_idl(pb2_object.execution_cluster_label) if + pb2_object.HasField("execution_cluster_label") else None, ) diff --git a/tests/flytekit/unit/models/test_matchable_resource.py b/tests/flytekit/unit/models/test_matchable_resource.py index 684caf7b6a..229f0797e8 100644 --- a/tests/flytekit/unit/models/test_matchable_resource.py +++ b/tests/flytekit/unit/models/test_matchable_resource.py @@ -15,6 +15,12 @@ def test_execution_queue_attributes(): assert obj == matchable_resource.ExecutionQueueAttributes.from_flyte_idl(obj.to_flyte_idl()) +def test_execution_cluster_label(): + obj = matchable_resource.ExecutionClusterLabel("my_cluster") + assert obj.value == "my_cluster" + assert obj == matchable_resource.ExecutionClusterLabel.from_flyte_idl(obj.to_flyte_idl()) + + def test_matchable_resource(): cluster_resource_attrs = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"}) obj = matchable_resource.MatchingAttributes(cluster_resource_attributes=cluster_resource_attrs) @@ -25,3 +31,8 @@ def test_matchable_resource(): obj2 = matchable_resource.MatchingAttributes(execution_queue_attributes=execution_queue_attributes) assert obj2.execution_queue_attributes == execution_queue_attributes assert obj2 == matchable_resource.MatchingAttributes.from_flyte_idl(obj2.to_flyte_idl()) + + execution_cluster_label = matchable_resource.ExecutionClusterLabel("my_cluster") + obj2 = matchable_resource.MatchingAttributes(execution_cluster_label=execution_cluster_label) + assert obj2.execution_cluster_label == execution_cluster_label + assert obj2 == matchable_resource.MatchingAttributes.from_flyte_idl(obj2.to_flyte_idl())