From 4c9d4f5786ba999d1fa66346699607803c5e4b5c Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Fri, 17 Mar 2023 09:37:14 -0400 Subject: [PATCH] feat(sdk): add experiment_id parameter to create run methods Co-authored-by: alenawang Co-authored-by: andreafehrman Co-authored-by: owmasch Co-authored-by: ryanrusson --- sdk/python/kfp/client/client.py | 50 +++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/sdk/python/kfp/client/client.py b/sdk/python/kfp/client/client.py index 02306318ca3..34a5170a95c 100644 --- a/sdk/python/kfp/client/client.py +++ b/sdk/python/kfp/client/client.py @@ -507,7 +507,7 @@ def get_pipeline_id(self, name: str) -> Optional[str]: return result.pipelines[0].id elif len(result.pipelines) > 1: raise ValueError( - f'Multiple pipelines with the name: {name} found, the name needs to be unique' + f'Multiple pipelines with the name: {name} found, the name needs to be unique.' ) return None @@ -572,7 +572,7 @@ def get_experiment( namespace = namespace or self.get_user_namespace() if experiment_id is None and experiment_name is None: raise ValueError( - 'Either experiment_id or experiment_name is required') + 'Either experiment_id or experiment_name is required.') if experiment_id is not None: return self._experiment_api.get_experiment(id=experiment_id) experiment_filter = json.dumps({ @@ -916,7 +916,7 @@ def create_recurring_run( if all([interval_second, cron_expression ]) or not any([interval_second, cron_expression]): raise ValueError( - 'Either interval_second or cron_expression is required') + 'Either interval_second or cron_expression is required.') if interval_second is not None: trigger = kfp_server_api.ApiTrigger( periodic_schedule=kfp_server_api.ApiPeriodicSchedule( @@ -1025,6 +1025,7 @@ def create_run_from_pipeline_func( pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None, + experiment_id: Optional[str] = None, ) -> RunPipelineResult: """Runs pipeline on KFP-enabled Kubernetes cluster. @@ -1035,7 +1036,7 @@ def create_run_from_pipeline_func( pipeline_func: Pipeline function constructed with ``@kfp.dsl.pipeline`` decorator. arguments: Arguments to the pipeline function provided as a dict. run_name: Name of the run to be shown in the UI. - experiment_name: Name of the experiment to add the run to. + experiment_name: Name of the experiment to add the run to. You cannot specify both experiment_name and experiment_id. namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``. pipeline_root: Root path of the pipeline outputs. enable_caching: Whether or not to enable caching for the @@ -1046,6 +1047,7 @@ def create_run_from_pipeline_func( compile time settings). service_account: Specifies which Kubernetes service account to use for this run. + experiment_id: ID of the experiment to add the run to. You cannot specify both experiment_id and experiment_name. Returns: ``RunPipelineResult`` object containing information about the pipeline run. @@ -1066,6 +1068,7 @@ def create_run_from_pipeline_func( pipeline_file=pipeline_package_path, arguments=arguments, run_name=run_name, + experiment_id=experiment_id, experiment_name=experiment_name, namespace=namespace, pipeline_root=pipeline_root, @@ -1083,6 +1086,7 @@ def create_run_from_pipeline_package( pipeline_root: Optional[str] = None, enable_caching: Optional[bool] = None, service_account: Optional[str] = None, + experiment_id: Optional[str] = None, ) -> RunPipelineResult: """Runs pipeline on KFP-enabled Kubernetes cluster. @@ -1093,7 +1097,7 @@ def create_run_from_pipeline_package( pipeline_file: A compiled pipeline package file. arguments: Arguments to the pipeline function provided as a dict. run_name: Name of the run to be shown in the UI. - experiment_name: Name of the experiment to add the run to. + experiment_name: Name of the experiment to add the run to. You cannot specify both experiment_name and experiment_id. namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``. pipeline_root: Root path of the pipeline outputs. enable_caching: Whether or not to enable caching for the @@ -1104,6 +1108,7 @@ def create_run_from_pipeline_package( compile time settings). service_account: Specifies which Kubernetes service account to use for this run. + experiment_id: ID of the experiment to add the run to. You cannot specify both experiment_id and experiment_name. Returns: ``RunPipelineResult`` object containing information about the pipeline run. @@ -1111,22 +1116,31 @@ def create_run_from_pipeline_package( #TODO: Check arguments against the pipeline function pipeline_name = os.path.basename(pipeline_file) - experiment_name = experiment_name or os.environ.get( - KF_PIPELINES_DEFAULT_EXPERIMENT_NAME, None) - overridden_experiment_name = os.environ.get( - KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME, experiment_name) - if overridden_experiment_name != experiment_name: - warnings.warn( - f'Changing experiment name from "{experiment_name}" to "{overridden_experiment_name}".' - ) - experiment_name = overridden_experiment_name or 'Default' + + if (experiment_name is not None) and (experiment_id is not None): + raise ValueError( + 'You cannot specify both experiment_name and experiment_id.') + + if not experiment_id: + experiment_name = experiment_name or os.environ.get( + KF_PIPELINES_DEFAULT_EXPERIMENT_NAME, None) + overridden_experiment_name = os.environ.get( + KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME, experiment_name) + if overridden_experiment_name != experiment_name: + warnings.warn( + f'Changing experiment name from "{experiment_name}" to "{overridden_experiment_name}".' + ) + experiment_name = overridden_experiment_name or 'Default' + experiment = self.create_experiment( + name=experiment_name, namespace=namespace) + experiment_id = experiment.id + run_name = run_name or ( pipeline_name + ' ' + datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S')) - experiment = self.create_experiment( - name=experiment_name, namespace=namespace) + run_info = self.run_pipeline( - experiment_id=experiment.id, + experiment_id=experiment_id, job_name=run_name, pipeline_package_path=pipeline_file, params=arguments, @@ -1411,7 +1425,7 @@ def upload_pipeline_version( if all([pipeline_id, pipeline_name ]) or not any([pipeline_id, pipeline_name]): - raise ValueError('Either pipeline_id or pipeline_name is required') + raise ValueError('Either pipeline_id or pipeline_name is required.') if pipeline_name: pipeline_id = self.get_pipeline_id(pipeline_name)