diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 928d698d7d..02ed811b13 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins -__version__ = '0.9.0b0' +__version__ = '0.9.0b1' diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 7b32109fb6..329be5991e 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -4,7 +4,8 @@ from flyteidl.admin import task_pb2 as _task_pb2, common_pb2 as _common_pb2, workflow_pb2 as _workflow_pb2, \ launch_plan_pb2 as _launch_plan_pb2, execution_pb2 as _execution_pb2, node_execution_pb2 as _node_execution_pb2, \ - task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2 + task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2, project_domain_attributes_pb2 as \ + _project_domain_attributes_pb2, workflow_attributes_pb2 as _workflow_attributes_pb2 from flyteidl.core import identifier_pb2 as _identifier_pb2 from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient @@ -885,3 +886,47 @@ def register_project(self, project): project=project.to_flyte_idl(), ) ) + + #################################################################################################################### + # + # Matching Attributes Endpoints + # + #################################################################################################################### + + def update_project_domain_attributes(self, project, domain, matching_attributes): + """ + Sets custom attributes for a project and domain combination. + :param Text project: + :param Text domain: + :param flytekit.models.MatchingAttributes matching_attributes: + :return: + """ + super(SynchronousFlyteClient, self).update_project_domain_attributes( + _project_domain_attributes_pb2.ProjectDomainAttributesUpdateRequest( + attributes=_project_domain_attributes_pb2.ProjectDomainAttributes( + project=project, + domain=domain, + matching_attributes=matching_attributes.to_flyte_idl(), + ) + ) + ) + + def update_workflow_attributes(self, project, domain, workflow, matching_attributes): + """ + Sets custom attributes for a project, domain, and workflow combination. + :param Text project: + :param Text domain: + :param Text workflow: + :param flytekit.models.MatchingAttributes matching_attributes: + :return: + """ + super(SynchronousFlyteClient, self).update_workflow_attributes( + _workflow_attributes_pb2.WorkflowAttributesUpdateRequest( + attributes=_workflow_attributes_pb2.WorkflowAttributes( + project=project, + domain=domain, + workflow=workflow, + matching_attributes=matching_attributes.to_flyte_idl(), + ) + ) + ) diff --git a/flytekit/clients/raw.py b/flytekit/clients/raw.py index 88ec062262..57c919998f 100644 --- a/flytekit/clients/raw.py +++ b/flytekit/clients/raw.py @@ -571,6 +571,30 @@ def register_project(self, project_register_request): """ return self._stub.RegisterProject(project_register_request, metadata=self._metadata) + #################################################################################################################### + # + # Matching Attributes Endpoints + # + #################################################################################################################### + @_handle_rpc_error + def update_project_domain_attributes(self, project_domain_attributes_update_request): + """ + This updates the attributes for a project and domain registered with the Flyte Admin Service + :param flyteidl.admin..ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request: + :rtype: flyteidl.admin..ProjectDomainAttributesUpdateResponse + """ + return self._stub.UpdateProjectDomainAttributes(project_domain_attributes_update_request, + metadata=self._metadata) + + @_handle_rpc_error + def update_workflow_attributes(self, workflow_attributes_update_request): + """ + This updates the attributes for a project, domain, and workflow registered with the Flyte Admin Service + :param flyteidl.admin..UpdateWorkflowAttributes workflow_attributes_update_request: + :rtype: flyteidl.admin..workflow_attributes_update_requestResponse + """ + return self._stub.UpdateWorkflowAttributes(workflow_attributes_update_request, metadata=self._metadata) + #################################################################################################################### # # Event Endpoints diff --git a/flytekit/clis/flyte_cli/main.py b/flytekit/clis/flyte_cli/main.py index 389de517df..e904eecccc 100644 --- a/flytekit/clis/flyte_cli/main.py +++ b/flytekit/clis/flyte_cli/main.py @@ -29,6 +29,9 @@ from flytekit.models.admin import common as _admin_common from flytekit.models.core import execution as _core_execution_models, identifier as _core_identifier from flytekit.models.execution import ExecutionSpec as _ExecutionSpec, ExecutionMetadata as _ExecutionMetadata +from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes,\ + ExecutionQueueAttributes as _ExecutionQueueAttributes, ExecutionClusterLabel as _ExecutionClusterLabel,\ + MatchingAttributes as _MatchingAttributes from flytekit.models.project import Project as _Project from flytekit.models.schedule import Schedule as _Schedule from flytekit.common.exceptions import user as _user_exceptions @@ -1719,6 +1722,108 @@ def update_launch_plan_meta(description, host, insecure, project, domain, name): _click.echo("Successfully updated launch plan") +@_flyte_cli.command('update-cluster-resource-attributes', cls=_FlyteSubCommand) +@_host_option +@_insecure_option +@_project_option +@_domain_option +@_optional_name_option +@_click.option('--attributes', type=(str, str), multiple=True) +def update_cluster_resource_attributes(host, insecure, project, domain, name, attributes): + """ + Sets matchable cluster resource attributes for a project, domain and optionally, workflow name. + + e.g. + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-cluster-resource-attributes \ + --attributes cpu 1 --attributes memory 500M + """ + _welcome_message() + client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) + cluster_resource_attributes = _ClusterResourceAttributes({attribute[0]: attribute[1] for attribute in attributes}) + matching_attributes = _MatchingAttributes(cluster_resource_attributes=cluster_resource_attributes) + + if name is not None: + client.update_workflow_attributes( + project, domain, name, matching_attributes + ) + _click.echo("Successfully updated cluster resource attributes for project: {}, domain: {}, and workflow: {}". + format(project, domain, name)) + else: + client.update_project_domain_attributes( + project, domain, matching_attributes + ) + _click.echo("Successfully updated cluster resource attributes for project: {} and domain: {}". + format(project, domain)) + + +@_flyte_cli.command('update-execution-queue-attributes', cls=_FlyteSubCommand) +@_host_option +@_insecure_option +@_project_option +@_domain_option +@_optional_name_option +@_click.option("--tags", multiple=True, help="Tag(s) to be applied.") +def update_execution_queue_attributes(host, insecure, project, domain, name, tags): + """ + Tags used for assigning execution queues for tasks belonging to a project, domain and optionally, workflow name. + + e.g. + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-execution-queue-attributes \ + --tags critical --tags gpu_intensive + """ + _welcome_message() + client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) + execution_queue_attributes = _ExecutionQueueAttributes(list(tags)) + matching_attributes = _MatchingAttributes(execution_queue_attributes=execution_queue_attributes) + + if name is not None: + client.update_workflow_attributes( + project, domain, name, matching_attributes + ) + _click.echo("Successfully updated execution queue attributes for project: {}, domain: {}, and workflow: {}". + format(project, domain, name)) + else: + client.update_project_domain_attributes( + project, domain, matching_attributes + ) + _click.echo("Successfully updated execution queue attributes for project: {} and domain: {}". + format(project, domain)) + + +@_flyte_cli.command('update-execution-cluster-label', cls=_FlyteSubCommand) +@_host_option +@_insecure_option +@_project_option +@_domain_option +@_optional_name_option +@_click.option("--value", help="Cluster label for which to schedule matching executions") +def update_execution_cluster_label(host, insecure, project, domain, name, value): + """ + Label value to determine where an execution's task will be run for tasks belonging to a project, domain and + optionally, workflow name. + + e.g. + $ flyte-cli -h localhost:30081 -p flyteexamples -d development update-execution-cluster-label --value foo + """ + _welcome_message() + client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure) + execution_cluster_label = _ExecutionClusterLabel(value) + matching_attributes = _MatchingAttributes(execution_cluster_label=execution_cluster_label) + + if name is not None: + client.update_workflow_attributes( + project, domain, name, matching_attributes + ) + _click.echo("Successfully updated execution cluster label for project: {}, domain: {}, and workflow: {}". + format(project, domain, name)) + else: + client.update_project_domain_attributes( + project, domain, matching_attributes + ) + _click.echo("Successfully updated execution cluster label for project: {} and domain: {}". + format(project, domain)) + + @_flyte_cli.command('setup-config', cls=_click.Command) @_host_option @_insecure_option diff --git a/flytekit/models/matchable_resource.py b/flytekit/models/matchable_resource.py new file mode 100644 index 0000000000..46171b0c19 --- /dev/null +++ b/flytekit/models/matchable_resource.py @@ -0,0 +1,185 @@ +from flyteidl.admin import matchable_resource_pb2 as _matchable_resource +from flytekit.models import common as _common + + +class ClusterResourceAttributes(_common.FlyteIdlEntity): + + def __init__(self, attributes): + """ + Custom resource attributes which will be applied in cluster resource creation (e.g. quotas). + Dict keys are the *case-sensitive* names of variables in templatized resource files. + Dict values should be the custom values which get substituted during resource creation. + + :param dict[Text, Text] attributes: Applied in cluster resource creation (e.g. quotas). + """ + self._attributes = attributes + + @property + def attributes(self): + """ + Custom resource attributes which will be applied in cluster resource management + :rtype: dict[Text, Text] + """ + return self._attributes + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes + """ + return _matchable_resource.ClusterResourceAttributes( + attributes=self.attributes, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.ClusterResourceAttributes pb2_object: + :rtype: ClusterResourceAttributes + """ + return cls( + attributes=pb2_object.attributes, + ) + + +class ExecutionQueueAttributes(_common.FlyteIdlEntity): + + def __init__(self, tags): + """ + Tags used for assigning execution queues for tasks matching a project, domain and optionally, workflow. + + :param list[Text] tags: + """ + self._tags = tags + + @property + def tags(self): + """ + :rtype: list[Text] + """ + return self._tags + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes + """ + return _matchable_resource.ExecutionQueueAttributes( + tags=self.tags, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.ExecutionQueueAttributes pb2_object: + :rtype: ExecutionQueueAttributes + """ + return cls( + tags=pb2_object.tags, + ) + + +class ExecutionClusterLabel(_common.FlyteIdlEntity): + + def __init__(self, value): + """ + Label value to determine where the execution will be run + + :param Text value: + """ + self._value = value + + @property + def value(self): + """ + :rtype: Text + """ + return self._value + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.ExecutionClusterLabel + """ + return _matchable_resource.ExecutionClusterLabel( + value=self.value, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.ExecutionClusterLabel pb2_object: + :rtype: ExecutionClusterLabel + """ + return cls( + value=pb2_object.value, + ) + + +class MatchingAttributes(_common.FlyteIdlEntity): + def __init__(self, cluster_resource_attributes=None, execution_queue_attributes=None, execution_cluster_label=None): + """ + At most one target from cluster_resource_attributes, execution_queue_attributes or execution_cluster_label + can be set. + :param ClusterResourceAttributes cluster_resource_attributes: + :param ExecutionQueueAttributes execution_queue_attributes: + :param ExecutionClusterLabel execution_cluster_label: + """ + if cluster_resource_attributes: + if execution_queue_attributes or execution_cluster_label: + raise ValueError("Only one target can be set") + elif execution_queue_attributes and execution_cluster_label: + raise ValueError("Only one target can be set") + + self._cluster_resource_attributes = cluster_resource_attributes + self._execution_queue_attributes = execution_queue_attributes + self._execution_cluster_label = execution_cluster_label + + @property + def cluster_resource_attributes(self): + """ + Custom resource attributes which will be applied in cluster resource creation (e.g. quotas). + :rtype: ClusterResourceAttributes + """ + return self._cluster_resource_attributes + + @property + def execution_queue_attributes(self): + """ + Tags used for assigning execution queues for tasks. + :rtype: ExecutionQueueAttributes + """ + return self._execution_queue_attributes + + @property + def execution_cluster_label(self): + """ + Label value to determine where the execution will be run. + :rtype: ExecutionClusterLabel + """ + return self._execution_cluster_label + + def to_flyte_idl(self): + """ + :rtype: flyteidl.admin.matchable_resource_pb2.MatchingAttributes + """ + return _matchable_resource.MatchingAttributes( + cluster_resource_attributes=self.cluster_resource_attributes.to_flyte_idl() if + self.cluster_resource_attributes else None, + execution_queue_attributes=self.execution_queue_attributes.to_flyte_idl() if self.execution_queue_attributes + else None, + execution_cluster_label=self.execution_cluster_label.to_flyte_idl() if self.execution_cluster_label + else None, + ) + + @classmethod + def from_flyte_idl(cls, pb2_object): + """ + :param flyteidl.admin.matchable_resource_pb2.MatchingAttributes pb2_object: + :rtype: MatchingAttributes + """ + return cls( + cluster_resource_attributes=ClusterResourceAttributes.from_flyte_idl( + pb2_object.cluster_resource_attributes) if pb2_object.HasField("cluster_resource_attributes") else None, + execution_queue_attributes=ExecutionQueueAttributes.from_flyte_idl(pb2_object.execution_queue_attributes) if + pb2_object.HasField("execution_queue_attributes") else None, + execution_cluster_label=ExecutionClusterLabel.from_flyte_idl(pb2_object.execution_cluster_label) if + pb2_object.HasField("execution_cluster_label") else None, + ) diff --git a/tests/flytekit/unit/models/test_matchable_resource.py b/tests/flytekit/unit/models/test_matchable_resource.py new file mode 100644 index 0000000000..229f0797e8 --- /dev/null +++ b/tests/flytekit/unit/models/test_matchable_resource.py @@ -0,0 +1,38 @@ +from __future__ import absolute_import + +from flytekit.models import matchable_resource + + +def test_cluster_resource_attributes(): + obj = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"}) + assert obj.attributes == {"cpu": "one million", "gpu": "just one"} + assert obj == matchable_resource.ClusterResourceAttributes.from_flyte_idl(obj.to_flyte_idl()) + + +def test_execution_queue_attributes(): + obj = matchable_resource.ExecutionQueueAttributes(["foo", "bar", "baz"]) + assert obj.tags == ["foo", "bar", "baz"] + assert obj == matchable_resource.ExecutionQueueAttributes.from_flyte_idl(obj.to_flyte_idl()) + + +def test_execution_cluster_label(): + obj = matchable_resource.ExecutionClusterLabel("my_cluster") + assert obj.value == "my_cluster" + assert obj == matchable_resource.ExecutionClusterLabel.from_flyte_idl(obj.to_flyte_idl()) + + +def test_matchable_resource(): + cluster_resource_attrs = matchable_resource.ClusterResourceAttributes({"cpu": "one million", "gpu": "just one"}) + obj = matchable_resource.MatchingAttributes(cluster_resource_attributes=cluster_resource_attrs) + assert obj.cluster_resource_attributes == cluster_resource_attrs + assert obj == matchable_resource.MatchingAttributes.from_flyte_idl(obj.to_flyte_idl()) + + execution_queue_attributes = matchable_resource.ExecutionQueueAttributes(["foo", "bar", "baz"]) + obj2 = matchable_resource.MatchingAttributes(execution_queue_attributes=execution_queue_attributes) + assert obj2.execution_queue_attributes == execution_queue_attributes + assert obj2 == matchable_resource.MatchingAttributes.from_flyte_idl(obj2.to_flyte_idl()) + + execution_cluster_label = matchable_resource.ExecutionClusterLabel("my_cluster") + obj2 = matchable_resource.MatchingAttributes(execution_cluster_label=execution_cluster_label) + assert obj2.execution_cluster_label == execution_cluster_label + assert obj2 == matchable_resource.MatchingAttributes.from_flyte_idl(obj2.to_flyte_idl())