From edfa76739d1064822af44e0addc924e381d3a5ad Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 9 Aug 2023 18:02:18 -0700 Subject: [PATCH] Add tags to execution (#1723) * wip Signed-off-by: Kevin Su * Add tests Signed-off-by: Kevin Su * Use JsonParamType instead Signed-off-by: Kevin Su * update Signed-off-by: Kevin Su * update idl Signed-off-by: Kevin Su * update idl Signed-off-by: Kevin Su * update idl Signed-off-by: Kevin Su * update idl Signed-off-by: Kevin Su * bump grpcio-status version Signed-off-by: Kevin Su * bump grpcioversion Signed-off-by: Kevin Su --------- Signed-off-by: Kevin Su Co-authored-by: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> --- doc-requirements.txt | 2 +- flytekit/clis/sdk_in_container/run.py | 8 +++++++ flytekit/models/execution.py | 9 ++++++++ flytekit/remote/remote.py | 23 +++++++++++++++++++ setup.py | 2 +- .../integration/remote/test_remote.py | 10 +++++++- tests/flytekit/unit/cli/pyflyte/test_run.py | 15 +++++++++++- 7 files changed, 65 insertions(+), 4 deletions(-) diff --git a/doc-requirements.txt b/doc-requirements.txt index 589577d639..6ca7fbc1ee 100644 --- a/doc-requirements.txt +++ b/doc-requirements.txt @@ -249,7 +249,7 @@ flask==2.3.2 # via mlflow flatbuffers==23.5.26 # via tensorflow -flyteidl==1.5.13 +flyteidl==1.5.14 # via flytekit fonttools==4.41.1 # via matplotlib diff --git a/flytekit/clis/sdk_in_container/run.py b/flytekit/clis/sdk_in_container/run.py index 1406276263..e8139108b8 100644 --- a/flytekit/clis/sdk_in_container/run.py +++ b/flytekit/clis/sdk_in_container/run.py @@ -582,6 +582,13 @@ def get_workflow_command_base_params() -> typing.List[click.Option]: type=JsonParamType(), help="Environment variables to set in the container", ), + click.Option( + param_decls=["--tag", "tag"], + required=False, + multiple=True, + type=str, + help="Tags to set for the execution", + ), ] @@ -708,6 +715,7 @@ def _run(*args, **kwargs): type_hints=entity.python_interface.inputs, overwrite_cache=run_level_params.get("overwrite_cache"), envs=run_level_params.get("envs"), + tags=run_level_params.get("tag"), ) console_url = remote.generate_console_url(execution) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 23b4baab01..468adb8884 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -178,6 +178,7 @@ def __init__( security_context: Optional[security.SecurityContext] = None, overwrite_cache: Optional[bool] = None, envs: Optional[_common_models.Envs] = None, + tags: Optional[typing.List[str]] = None, ): """ :param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute @@ -194,6 +195,7 @@ def __init__( :param security_context: Optional security context to use for this execution. :param overwrite_cache: Optional flag to overwrite the cache for this execution. :param envs: flytekit.models.common.Envs environment variables to set for this execution. + :param tags: Optional list of tags to apply to the execution. """ self._launch_plan = launch_plan self._metadata = metadata @@ -207,6 +209,7 @@ def __init__( self._security_context = security_context self._overwrite_cache = overwrite_cache self._envs = envs + self._tags = tags @property def launch_plan(self): @@ -281,6 +284,10 @@ def overwrite_cache(self) -> Optional[bool]: def envs(self) -> Optional[_common_models.Envs]: return self._envs + @property + def tags(self) -> Optional[typing.List[str]]: + return self._tags + def to_flyte_idl(self): """ :rtype: flyteidl.admin.execution_pb2.ExecutionSpec @@ -300,6 +307,7 @@ def to_flyte_idl(self): security_context=self.security_context.to_flyte_idl() if self.security_context else None, overwrite_cache=self.overwrite_cache, envs=self.envs.to_flyte_idl() if self.envs else None, + tags=self.tags, ) @classmethod @@ -325,6 +333,7 @@ def from_flyte_idl(cls, p): else None, overwrite_cache=p.overwrite_cache, envs=_common_models.Envs.from_flyte_idl(p.envs) if p.HasField("envs") else None, + tags=p.tags, ) diff --git a/flytekit/remote/remote.py b/flytekit/remote/remote.py index 59cdc8c212..519fde3ed3 100644 --- a/flytekit/remote/remote.py +++ b/flytekit/remote/remote.py @@ -955,6 +955,7 @@ def _execute( type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """Common method for execution across all entities. @@ -970,6 +971,7 @@ def _execute( for a single execution. If enabled, all calculations are performed even if cached results would be available, overwriting the stored data once execution finishes successfully. :param envs: Environment variables to set for the execution. + :param tags: Tags to set for the execution. :returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution` """ execution_name = execution_name or "f" + uuid.uuid4().hex[:19] @@ -1035,6 +1037,7 @@ def _execute( max_parallelism=options.max_parallelism, security_context=options.security_context, envs=common_models.Envs(envs) if envs else None, + tags=tags, ), literal_inputs, ) @@ -1092,6 +1095,7 @@ def execute( type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """ Execute a task, workflow, or launchplan, either something that's been declared locally, or a fetched entity. @@ -1129,6 +1133,7 @@ def execute( for a single execution. If enabled, all calculations are performed even if cached results would be available, overwriting the stored data once execution finishes successfully. :param envs: Environment variables to be set for the execution. + :param tags: Tags to be set for the execution. .. note: @@ -1149,6 +1154,7 @@ def execute( type_hints=type_hints, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) if isinstance(entity, FlyteWorkflow): return self.execute_remote_wf( @@ -1162,6 +1168,7 @@ def execute( type_hints=type_hints, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) if isinstance(entity, PythonTask): return self.execute_local_task( @@ -1176,6 +1183,7 @@ def execute( wait=wait, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) if isinstance(entity, WorkflowBase): return self.execute_local_workflow( @@ -1191,6 +1199,7 @@ def execute( wait=wait, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) if isinstance(entity, LaunchPlan): return self.execute_local_launch_plan( @@ -1204,6 +1213,7 @@ def execute( wait=wait, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) raise NotImplementedError(f"entity type {type(entity)} not recognized for execution") @@ -1222,6 +1232,7 @@ def execute_remote_task_lp( type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """Execute a FlyteTask, or FlyteLaunchplan. @@ -1238,6 +1249,7 @@ def execute_remote_task_lp( type_hints=type_hints, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) def execute_remote_wf( @@ -1252,6 +1264,7 @@ def execute_remote_wf( type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """Execute a FlyteWorkflow. @@ -1269,6 +1282,7 @@ def execute_remote_wf( type_hints=type_hints, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) # Flytekit Entities @@ -1287,6 +1301,7 @@ def execute_local_task( wait: bool = False, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """ Execute a @task-decorated function or TaskTemplate task. @@ -1302,6 +1317,7 @@ def execute_local_task( :param wait: If True, will wait for the execution to complete before returning. :param overwrite_cache: If True, will overwrite the cache. :param envs: Environment variables to set for the execution. + :param tags: Tags to set for the execution. :return: FlyteWorkflowExecution object. """ resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) @@ -1330,6 +1346,7 @@ def execute_local_task( type_hints=entity.python_interface.inputs, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) def execute_local_workflow( @@ -1346,6 +1363,7 @@ def execute_local_workflow( wait: bool = False, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """ Execute an @workflow decorated function. @@ -1361,6 +1379,7 @@ def execute_local_workflow( :param wait: :param overwrite_cache: :param envs: + :param tags: :return: """ resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version) @@ -1407,6 +1426,7 @@ def execute_local_workflow( type_hints=entity.python_interface.inputs, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) def execute_local_launch_plan( @@ -1421,6 +1441,7 @@ def execute_local_launch_plan( wait: bool = False, overwrite_cache: typing.Optional[bool] = None, envs: typing.Optional[typing.Dict[str, str]] = None, + tags: typing.Optional[typing.List[str]] = None, ) -> FlyteWorkflowExecution: """ @@ -1434,6 +1455,7 @@ def execute_local_launch_plan( :param wait: If True, will wait for the execution to complete before returning. :param overwrite_cache: If True, will overwrite the cache. :param envs: Environment variables to be passed into the execution. + :param tags: Tags to be passed into the execution. :return: FlyteWorkflowExecution object """ try: @@ -1461,6 +1483,7 @@ def execute_local_launch_plan( type_hints=entity.python_interface.inputs, overwrite_cache=overwrite_cache, envs=envs, + tags=tags, ) ################################### diff --git a/setup.py b/setup.py index 6feba76526..b758a3178d 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ }, install_requires=[ "googleapis-common-protos>=1.57", - "flyteidl>=1.5.12", + "flyteidl>=1.5.14", "wheel>=0.30.0,<1.0.0", "pandas>=1.0.0,<2.0.0", "pyarrow>=4.0.0,<11.0.0", diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 3466a48d92..72b3cb9192 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -173,10 +173,18 @@ def test_execute_python_task(flyteclient, flyte_workflows_register, flyte_remote remote = FlyteRemote(Config.auto(), PROJECT, "development") execution = remote.execute( - t1, inputs={"a": 10}, version=f"v{VERSION}", wait=True, overwrite_cache=True, envs={"foo": "bar"} + t1, + inputs={"a": 10}, + version=f"v{VERSION}", + wait=True, + overwrite_cache=True, + envs={"foo": "bar"}, + tags=["flyte"], ) assert execution.outputs["t1_int_output"] == 12 assert execution.outputs["c"] == "world" + assert execution.spec.envs == {"foo": "bar"} + assert execution.spec.tags == ["flyte"] def test_execute_python_workflow_and_launch_plan(flyteclient, flyte_workflows_register, flyte_remote_env): diff --git a/tests/flytekit/unit/cli/pyflyte/test_run.py b/tests/flytekit/unit/cli/pyflyte/test_run.py index 5f0671fe5d..a2cbda2115 100644 --- a/tests/flytekit/unit/cli/pyflyte/test_run.py +++ b/tests/flytekit/unit/cli/pyflyte/test_run.py @@ -168,7 +168,20 @@ def test_union_type2(input): env = '{"foo": "bar"}' result = runner.invoke( pyflyte.main, - ["run", "--overwrite-cache", "--envs", env, os.path.join(DIR_NAME, "workflow.py"), "test_union2", "--a", input], + [ + "run", + "--overwrite-cache", + "--envs", + env, + "--tag", + "flyte", + "--tag", + "hello", + os.path.join(DIR_NAME, "workflow.py"), + "test_union2", + "--a", + input, + ], catch_exceptions=False, ) print(result.stdout)