Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan committed Aug 14, 2020
1 parent f451cc5 commit 0bd3c90
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 6 deletions.
49 changes: 48 additions & 1 deletion flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from flyteidl.admin import task_pb2 as _task_pb2, common_pb2 as _common_pb2, workflow_pb2 as _workflow_pb2, \
launch_plan_pb2 as _launch_plan_pb2, execution_pb2 as _execution_pb2, node_execution_pb2 as _node_execution_pb2, \
task_execution_pb2 as _task_execution_pb2, project_pb2 as _project_pb2, project_domain_attributes_pb2 as \
_project_domain_attributes_pb2, workflow_attributes_pb2 as _workflow_attributes_pb2
_project_domain_attributes_pb2, workflow_attributes_pb2 as _workflow_attributes_pb2, matchable_resource_pb2 as \
_matchable_resource_pb2
from flyteidl.core import identifier_pb2 as _identifier_pb2

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
Expand Down Expand Up @@ -930,3 +931,49 @@ def update_workflow_attributes(self, project, domain, workflow, matching_attribu
)
)
)

def get_project_domain_attributes(self, project, domain, resource_type):
"""
Fetches the custom attributes set for a project and domain combination.
:param Text project:
:param Text domain:
:param flytekit.models.MatchableResource resource_type:
:return:
"""
return super(SynchronousFlyteClient, self).get_project_domain_attributes(
_project_domain_attributes_pb2.ProjectDomainAttributesGetRequest(
project=project,
domain=domain,
resource_type=resource_type,
)
)

def get_workflow_attributes(self, project, domain, workflow, resource_type):
"""
Fetches the custom attributes set for a project, domain, and workflow combination.
:param Text project:
:param Text domain:
:param Text workflow:
:param flytekit.models.MatchableResource resource_type:
:return:
"""
return super(SynchronousFlyteClient, self).get_workflow_attributes(
_workflow_attributes_pb2.WorkflowAttributesGetRequest(
project=project,
domain=domain,
workflow=workflow,
resource_type=resource_type,
)
)

def list_matchable_attributes(self, resource_type):
"""
Fetches all custom attributes for a resource type.
:param flytekit.models.MatchableResource resource_type:
:return:
"""
return super(SynchronousFlyteClient, self).list_matchable_attributes(
_matchable_resource_pb2.ListMatchableAttributesRequest(
resource_type=resource_type,
)
)
35 changes: 31 additions & 4 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ def register_project(self, project_register_request):
def update_project_domain_attributes(self, project_domain_attributes_update_request):
"""
This updates the attributes for a project and domain registered with the Flyte Admin Service
:param flyteidl.admin..ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request:
:rtype: flyteidl.admin..ProjectDomainAttributesUpdateResponse
:param flyteidl.admin.ProjectDomainAttributesUpdateRequest project_domain_attributes_update_request:
:rtype: flyteidl.admin.ProjectDomainAttributesUpdateResponse
"""
return self._stub.UpdateProjectDomainAttributes(project_domain_attributes_update_request,
metadata=self._metadata)
Expand All @@ -614,11 +614,38 @@ def update_project_domain_attributes(self, project_domain_attributes_update_requ
def update_workflow_attributes(self, workflow_attributes_update_request):
"""
This updates the attributes for a project, domain, and workflow registered with the Flyte Admin Service
:param flyteidl.admin..UpdateWorkflowAttributes workflow_attributes_update_request:
:rtype: flyteidl.admin..workflow_attributes_update_requestResponse
:param flyteidl.admin.UpdateWorkflowAttributesRequest workflow_attributes_update_request:
:rtype: flyteidl.admin.WorkflowAttributesUpdateResponse
"""
return self._stub.UpdateWorkflowAttributes(workflow_attributes_update_request, metadata=self._metadata)

@_handle_rpc_error
def get_project_domain_attributes(self, project_domain_attributes_get_request):
"""
This fetches the attributes for a project and domain registered with the Flyte Admin Service
:param flyteidl.admin.ProjectDomainAttributesGetRequest project_domain_attributes_get_request:
:rtype: flyteidl.admin.ProjectDomainAttributesGetResponse
"""
return self._stub.GetProjectDomainAttributes(project_domain_attributes_get_request, metadata=self._metadata)

@_handle_rpc_error
def get_workflow_attributes(self, workflow_attributes_get_request):
"""
This fetches the attributes for a project, domain, and workflow registered with the Flyte Admin Service
:param flyteidl.admin.GetWorkflowAttributesAttributesRequest workflow_attributes_get_request:
:rtype: flyteidl.admin.WorkflowAttributesGetResponse
"""
return self._stub.GetWorkflowAttributes(workflow_attributes_get_request, metadata=self._metadata)

@_handle_rpc_error
def list_matchable_attributes(self, matchable_attributes_list_request):
"""
This fetches the attributes for a specific resource type registered with the Flyte Admin Service
:param flyteidl.admin.ListMatchableAttributesRequest matchable_attributes_list_request:
:rtype: flyteidl.admin.ListMatchableAttributesResponse
"""
return self._stub.ListMatchableAttributes(matchable_attributes_list_request, metadata=self._metadata)

####################################################################################################################
#
# Event Endpoints
Expand Down
59 changes: 58 additions & 1 deletion flytekit/clis/flyte_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from flytekit.models.execution import ExecutionSpec as _ExecutionSpec, ExecutionMetadata as _ExecutionMetadata
from flytekit.models.matchable_resource import ClusterResourceAttributes as _ClusterResourceAttributes,\
ExecutionQueueAttributes as _ExecutionQueueAttributes, ExecutionClusterLabel as _ExecutionClusterLabel,\
MatchingAttributes as _MatchingAttributes
MatchingAttributes as _MatchingAttributes, MatchableResource as _MatchableResource
from flytekit.models.project import Project as _Project
from flytekit.models.schedule import Schedule as _Schedule
from flytekit.common.exceptions import user as _user_exceptions
Expand Down Expand Up @@ -1856,6 +1856,63 @@ def update_execution_cluster_label(host, insecure, project, domain, name, value)
format(project, domain))


@_flyte_cli.command('get-matching-attributes', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_project_option
@_domain_option
@_optional_name_option
@_click.option("--resource-type", help="Resource type", required=True,
type=_click.Choice(["task_resource", "cluster_resource", "execution_queue", "execution_cluster_label",
"quality_of_service_specification"]))
def get_matching_attributes(host, insecure, project, domain, name, resource_type):
"""
Fetches the matchable resource of the given resource type for this project, domain and optionally workflow name
combination.
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)

if name is not None:
attributes = client.get_workflow_attributes(
project, domain, name, _MatchableResource.string_to_enum(resource_type.upper())
)
_click.echo("{}".format(attributes))
else:
attributes = client.get_project_domain_attributes(
project, domain, _MatchableResource.string_to_enum(resource_type.upper())
)
_click.echo("{}".format(attributes))


@_flyte_cli.command('list-matching-attributes', cls=_FlyteSubCommand)
@_host_option
@_insecure_option
@_click.option("--resource-type", help="Resource type", required=True,
type=_click.Choice(["task_resource", "cluster_resource", "execution_queue", "execution_cluster_label",
"quality_of_service_specification"]))
def list_matching_attributes(host, insecure, resource_type):
"""
Fetches all matchable resources of the given resource type.
"""
_welcome_message()
client = _friendly_client.SynchronousFlyteClient(host, insecure=insecure)

attributes = client.list_matchable_attributes(_MatchableResource.string_to_enum(resource_type.upper()))
for configuration in attributes.configurations:
_click.secho(
"{:20} {:20} {:20} {:20}\n".format(
_tt(configuration.project),
_tt(configuration.domain),
_tt(configuration.workflow),
_tt(configuration.launch_plan),
),
fg='blue',
nl=False
)
_click.echo("{}".format(configuration.attributes))


@_flyte_cli.command('setup-config', cls=_click.Command)
@_host_option
@_insecure_option
Expand Down
46 changes: 46 additions & 0 deletions flytekit/models/matchable_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,52 @@
from flytekit.models import common as _common


class MatchableResource(object):
TASK_RESOURCE = _matchable_resource.TASK_RESOURCE
CLUSTER_RESOURCE = _matchable_resource.CLUSTER_RESOURCE
EXECUTION_QUEUE = _matchable_resource.EXECUTION_QUEUE
EXECUTION_CLUSTER_LABEL = _matchable_resource.EXECUTION_CLUSTER_LABEL
QUALITY_OF_SERVICE_SPECIFICATION = _matchable_resource.QUALITY_OF_SERVICE_SPECIFICATION

@classmethod
def enum_to_string(cls, val):
"""
:param int val:
:rtype: Text
"""
if val == cls.TASK_RESOURCE:
return "TASK_RESOURCE"
elif val == cls.CLUSTER_RESOURCE:
return "CLUSTER_RESOURCE"
elif val == cls.EXECUTION_QUEUE:
return "EXECUTION_QUEUE"
elif val == cls.EXECUTION_CLUSTER_LABEL:
return "EXECUTION_CLUSTER_LABEL"
elif val == cls.QUALITY_OF_SERVICE_SPECIFICATION:
return "QUALITY_OF_SERVICE_SPECIFICATION"
else:
return "<UNKNOWN>"

@classmethod
def string_to_enum(cls, val):
"""
:param Text val:
:rtype: int
"""
if val == "TASK_RESOURCE":
return cls.TASK_RESOURCE
elif val == "CLUSTER_RESOURCE":
return cls.CLUSTER_RESOURCE
elif val == "EXECUTION_QUEUE":
return cls.EXECUTION_QUEUE
elif val == "EXECUTION_CLUSTER_LABEL":
return cls.EXECUTION_CLUSTER_LABEL
elif val == cls.QUALITY_OF_SERVICE_SPECIFICATION:
return "QUALITY_OF_SERVICE_SPECIFICATION"
else:
return "<UNKNOWN>"


class ClusterResourceAttributes(_common.FlyteIdlEntity):

def __init__(self, attributes):
Expand Down

0 comments on commit 0bd3c90

Please sign in to comment.