Skip to content

Commit

Permalink
feat(sdk): add experiment_id parameter to create run methods
Browse files Browse the repository at this point in the history
Co-authored-by: alenawang <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>
Co-authored-by: ryanrusson <[email protected]>
  • Loading branch information
5 people committed Mar 20, 2023
1 parent a9ead5a commit 5be0a6c
Showing 1 changed file with 30 additions and 16 deletions.
46 changes: 30 additions & 16 deletions sdk/python/kfp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def get_pipeline_id(self, name: str) -> Optional[str]:
return result.pipelines[0].id
elif len(result.pipelines) > 1:
raise ValueError(
f'Multiple pipelines with the name: {name} found, the name needs to be unique'
f'Multiple pipelines with the name: {name} found, the name needs to be unique.'
)
return None

Expand Down Expand Up @@ -572,7 +572,7 @@ def get_experiment(
namespace = namespace or self.get_user_namespace()
if experiment_id is None and experiment_name is None:
raise ValueError(
'Either experiment_id or experiment_name is required')
'Either experiment_id or experiment_name is required.')
if experiment_id is not None:
return self._experiment_api.get_experiment(id=experiment_id)
experiment_filter = json.dumps({
Expand Down Expand Up @@ -916,7 +916,7 @@ def create_recurring_run(
if all([interval_second, cron_expression
]) or not any([interval_second, cron_expression]):
raise ValueError(
'Either interval_second or cron_expression is required')
'Either interval_second or cron_expression is required.')
if interval_second is not None:
trigger = kfp_server_api.ApiTrigger(
periodic_schedule=kfp_server_api.ApiPeriodicSchedule(
Expand Down Expand Up @@ -1025,6 +1025,7 @@ def create_run_from_pipeline_func(
pipeline_root: Optional[str] = None,
enable_caching: Optional[bool] = None,
service_account: Optional[str] = None,
experiment_id: Optional[str] = None,
) -> RunPipelineResult:
"""Runs pipeline on KFP-enabled Kubernetes cluster.
Expand All @@ -1046,6 +1047,7 @@ def create_run_from_pipeline_func(
compile time settings).
service_account: Specifies which Kubernetes service
account to use for this run.
experiment_id: ID of the experiment to add the run to.
Returns:
``RunPipelineResult`` object containing information about the pipeline run.
Expand All @@ -1066,6 +1068,7 @@ def create_run_from_pipeline_func(
pipeline_file=pipeline_package_path,
arguments=arguments,
run_name=run_name,
experiment_id=experiment_id,
experiment_name=experiment_name,
namespace=namespace,
pipeline_root=pipeline_root,
Expand All @@ -1083,6 +1086,7 @@ def create_run_from_pipeline_package(
pipeline_root: Optional[str] = None,
enable_caching: Optional[bool] = None,
service_account: Optional[str] = None,
experiment_id: Optional[str] = None,
) -> RunPipelineResult:
"""Runs pipeline on KFP-enabled Kubernetes cluster.
Expand All @@ -1104,29 +1108,39 @@ def create_run_from_pipeline_package(
compile time settings).
service_account: Specifies which Kubernetes service
account to use for this run.
experiment_id: ID of the experiment to add the run to.
Returns:
``RunPipelineResult`` object containing information about the pipeline run.
"""

#TODO: Check arguments against the pipeline function
pipeline_name = os.path.basename(pipeline_file)
experiment_name = experiment_name or os.environ.get(
KF_PIPELINES_DEFAULT_EXPERIMENT_NAME, None)
overridden_experiment_name = os.environ.get(
KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME, experiment_name)
if overridden_experiment_name != experiment_name:
warnings.warn(
f'Changing experiment name from "{experiment_name}" to "{overridden_experiment_name}".'
)
experiment_name = overridden_experiment_name or 'Default'

if (experiment_name is not None) and (experiment_id is not None):
raise ValueError(
'You cannot specify both experiment_name and experiment_id.')

if not experiment_id:
experiment_name = experiment_name or os.environ.get(
KF_PIPELINES_DEFAULT_EXPERIMENT_NAME, None)
overridden_experiment_name = os.environ.get(
KF_PIPELINES_OVERRIDE_EXPERIMENT_NAME, experiment_name)
if overridden_experiment_name != experiment_name:
warnings.warn(
f'Changing experiment name from "{experiment_name}" to "{overridden_experiment_name}".'
)
experiment_name = overridden_experiment_name or 'Default'
experiment = self.create_experiment(
name=experiment_name, namespace=namespace)
experiment_id = experiment.id

run_name = run_name or (
pipeline_name + ' ' +
datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S'))
experiment = self.create_experiment(
name=experiment_name, namespace=namespace)

run_info = self.run_pipeline(
experiment_id=experiment.id,
experiment_id=experiment_id,
job_name=run_name,
pipeline_package_path=pipeline_file,
params=arguments,
Expand Down Expand Up @@ -1411,7 +1425,7 @@ def upload_pipeline_version(

if all([pipeline_id, pipeline_name
]) or not any([pipeline_id, pipeline_name]):
raise ValueError('Either pipeline_id or pipeline_name is required')
raise ValueError('Either pipeline_id or pipeline_name is required.')

if pipeline_name:
pipeline_id = self.get_pipeline_id(pipeline_name)
Expand Down

0 comments on commit 5be0a6c

Please sign in to comment.