diff --git a/sdk/python/kfp/cli/experiment.py b/sdk/python/kfp/cli/experiment.py
index 2212148ef66..cf94fd904fc 100644
--- a/sdk/python/kfp/cli/experiment.py
+++ b/sdk/python/kfp/cli/experiment.py
@@ -126,7 +126,7 @@ def archive(ctx: click.Context, experiment_id: str, experiment_name: str):
if not experiment_id:
experiment = client_obj.get_experiment(experiment_name=experiment_name)
- experiment_id = experiment.id
+ experiment_id = experiment.experiment_id
client_obj.archive_experiment(experiment_id=experiment_id)
if experiment_id:
@@ -162,7 +162,7 @@ def unarchive(ctx: click.Context, experiment_id: str, experiment_name: str):
if not experiment_id:
experiment = client_obj.get_experiment(experiment_name=experiment_name)
- experiment_id = experiment.id
+ experiment_id = experiment.experiment_id
client_obj.unarchive_experiment(experiment_id=experiment_id)
if experiment_id:
diff --git a/sdk/python/kfp/cli/output.py b/sdk/python/kfp/cli/output.py
index 8202dc78020..23764715f6f 100644
--- a/sdk/python/kfp/cli/output.py
+++ b/sdk/python/kfp/cli/output.py
@@ -16,7 +16,7 @@
import datetime
import enum
import json
-from typing import Any, Dict, Union
+from typing import Any, Dict
import click
import kfp_server_api
@@ -45,17 +45,6 @@ class OutputFormat(enum.Enum):
json = 'json'
-RUN_STORAGE_STATE_MAP = {
- kfp_server_api.ApiRunStorageState.AVAILABLE: 'Available',
- kfp_server_api.ApiRunStorageState.ARCHIVED: 'Archived',
-}
-EXPERIMENT_STORAGE_STATE_MAP = {
- kfp_server_api.ApiExperimentStorageState.AVAILABLE: 'Available',
- kfp_server_api.ApiExperimentStorageState.ARCHIVED: 'Archived',
- kfp_server_api.ApiExperimentStorageState.UNSPECIFIED: 'Unspecified',
-}
-
-
def snake_to_header(string: str) -> str:
"""Converts a snake case string to a table header by replacing underscores
with spaces and making uppercase.
@@ -74,18 +63,17 @@ class ExperimentData:
id: str
name: str
created_at: str
- state: str
+ storage_state: str
-def transform_experiment(exp: kfp_server_api.ApiExperiment) -> Dict[str, Any]:
+def transform_experiment(
+ exp: kfp_server_api.V2beta1Experiment) -> Dict[str, Any]:
return dataclasses.asdict(
ExperimentData(
- id=exp.id,
- name=exp.name,
+ id=exp.experiment_id,
+ name=exp.display_name,
created_at=exp.created_at.isoformat(),
- state=EXPERIMENT_STORAGE_STATE_MAP.get(
- exp.storage_state, EXPERIMENT_STORAGE_STATE_MAP[
- kfp_server_api.ApiExperimentStorageState.AVAILABLE])))
+ storage_state=exp.storage_state))
@dataclasses.dataclass
@@ -93,20 +81,16 @@ class PipelineData:
id: str
name: str
created_at: str
- default_version: str
-def transform_pipeline(pipeline: kfp_server_api.ApiPipeline) -> Dict[str, Any]:
- default_version_id = pipeline.default_version.id if hasattr(
- pipeline,
- 'default_version') and pipeline.default_version is not None and hasattr(
- pipeline.default_version, 'id') else None
+def transform_pipeline(
+ pipeline: kfp_server_api.V2beta1Pipeline) -> Dict[str, Any]:
return dataclasses.asdict(
PipelineData(
- id=pipeline.id,
- name=pipeline.name,
+ id=pipeline.pipeline_id,
+ name=pipeline.display_name,
created_at=pipeline.created_at.isoformat(),
- default_version=default_version_id))
+ ))
@dataclasses.dataclass
@@ -118,16 +102,14 @@ class PipelineVersionData:
def transform_pipeline_version(
- pipeline_version: kfp_server_api.ApiPipelineVersion) -> Dict[str, Any]:
- parent_id = next(
- rr for rr in pipeline_version.resource_references
- if rr.relationship == kfp_server_api.ApiRelationship.OWNER).key.id
+ pipeline_version: kfp_server_api.V2beta1PipelineVersion
+) -> Dict[str, Any]:
return dataclasses.asdict(
PipelineVersionData(
- id=pipeline_version.id,
- name=pipeline_version.name,
+ id=pipeline_version.pipeline_version_id,
+ name=pipeline_version.display_name,
created_at=pipeline_version.created_at.isoformat(),
- parent_id=parent_id,
+ parent_id=pipeline_version.pipeline_id,
))
@@ -136,26 +118,22 @@ class RunData:
id: str
name: str
created_at: str
- status: str
state: str
+ storage_state: str
-def transform_run(
- run: Union[kfp_server_api.ApiRun, kfp_server_api.ApiRunDetail]
-) -> Dict[str, Any]:
+def transform_run(run: kfp_server_api.V2beta1Run) -> Dict[str, Any]:
return dataclasses.asdict((RunData(
- id=run.id,
- name=run.name,
+ id=run.run_id,
+ name=run.display_name,
created_at=run.created_at.isoformat(),
- status=run.status,
- state=RUN_STORAGE_STATE_MAP.get(
- run.storage_state,
- RUN_STORAGE_STATE_MAP[kfp_server_api.ApiRunStorageState.AVAILABLE]))
- ))
+ state=run.state,
+ storage_state=run.storage_state,
+ )))
@dataclasses.dataclass
-class JobData:
+class RecurringRunData:
id: str
name: str
created_at: str
@@ -163,16 +141,14 @@ class JobData:
status: str
-def transform_job(recurring_run: kfp_server_api.ApiJob) -> Dict[str, Any]:
- experiment_id = next(
- rr for rr in recurring_run.resource_references
- if rr.key.type == kfp_server_api.ApiResourceType.EXPERIMENT).key.id
+def transform_recurring_run(
+ recurring_run: kfp_server_api.V2beta1RecurringRun) -> Dict[str, Any]:
return dataclasses.asdict(
- JobData(
- id=recurring_run.id,
- name=recurring_run.name,
+ RecurringRunData(
+ id=recurring_run.recurring_run_id,
+ name=recurring_run.display_name,
created_at=recurring_run.created_at.isoformat(),
- experiment_id=experiment_id,
+ experiment_id=recurring_run.experiment_id,
status=recurring_run.status))
@@ -183,7 +159,7 @@ class ModelType(enum.Enum):
PIPELINE = 'PIPELINE'
PIPELINE_VERSION = 'PIPELINE_VERSION'
RUN = 'RUN'
- JOB = 'JOB'
+ RECURRING_RUN = 'RECURRING_RUN'
transformer_map = {
@@ -191,7 +167,7 @@ class ModelType(enum.Enum):
ModelType.PIPELINE: transform_pipeline,
ModelType.PIPELINE_VERSION: transform_pipeline_version,
ModelType.RUN: transform_run,
- ModelType.JOB: transform_job,
+ ModelType.RECURRING_RUN: transform_recurring_run,
}
dataclass_map = {
@@ -199,7 +175,7 @@ class ModelType(enum.Enum):
ModelType.PIPELINE: PipelineData,
ModelType.PIPELINE_VERSION: PipelineVersionData,
ModelType.RUN: RunData,
- ModelType.JOB: JobData,
+ ModelType.RECURRING_RUN: RecurringRunData,
}
diff --git a/sdk/python/kfp/cli/pipeline.py b/sdk/python/kfp/cli/pipeline.py
index 205d179567d..3631f0ed4b2 100644
--- a/sdk/python/kfp/cli/pipeline.py
+++ b/sdk/python/kfp/cli/pipeline.py
@@ -191,16 +191,17 @@ def list_versions(ctx: click.Context, pipeline_id: str, page_token: str,
sort_by=sort_by,
filter=filter)
output.print_output(
- response.versions or [],
- output.ModelType.PIPELINE,
+ response.pipeline_versions or [],
+ output.ModelType.PIPELINE_VERSION,
output_format,
)
@pipeline.command()
+@click.argument('pipeline-id')
@click.argument('version-id')
@click.pass_context
-def delete_version(ctx: click.Context, version_id: str):
+def delete_version(ctx: click.Context, pipeline_id: str, version_id: str):
"""Delete a version of a pipeline."""
confirmation = f'Are you sure you want to delete pipeline version {version_id}?'
if not click.confirm(confirmation):
@@ -209,7 +210,8 @@ def delete_version(ctx: click.Context, version_id: str):
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
- client_obj.delete_pipeline_version(version_id)
+ client_obj.delete_pipeline_version(
+ pipeline_id=pipeline_id, pipeline_version_id=version_id)
output.print_deleted_text('pipeline version', version_id, output_format)
@@ -230,14 +232,16 @@ def get(ctx: click.Context, pipeline_id: str):
@pipeline.command()
+@click.argument('pipeline-id')
@click.argument('version-id')
@click.pass_context
-def get_version(ctx: click.Context, version_id: str):
+def get_version(ctx: click.Context, pipeline_id: str, version_id: str):
"""Get information about a version of a pipeline."""
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
- version = client_obj.get_pipeline_version(version_id=version_id)
+ version = client_obj.get_pipeline_version(
+ pipeline_id=pipeline_id, pipeline_version_id=version_id)
output.print_output(
version,
output.ModelType.PIPELINE,
diff --git a/sdk/python/kfp/cli/recurring_run.py b/sdk/python/kfp/cli/recurring_run.py
index 9a6a1df4d59..c0f7004cf6c 100644
--- a/sdk/python/kfp/cli/recurring_run.py
+++ b/sdk/python/kfp/cli/recurring_run.py
@@ -135,7 +135,7 @@ def create(ctx: click.Context,
raise ValueError(either_option_required)
if not experiment_id:
experiment = client_obj.create_experiment(experiment_name)
- experiment_id = experiment.id
+ experiment_id = experiment.experiment_id
# Ensure we only split on the first equals char so the value can contain
# equals signs too.
@@ -159,7 +159,7 @@ def create(ctx: click.Context,
version_id=version_id)
output.print_output(
recurring_run,
- output.ModelType.JOB,
+ output.ModelType.RECURRING_RUN,
output_format,
)
@@ -202,73 +202,75 @@ def list(ctx: click.Context, experiment_id: str, page_token: str, max_size: int,
sort_by=sort_by,
filter=filter)
output.print_output(
- response.jobs or [],
- output.ModelType.JOB,
+ response.recurring_runs or [],
+ output.ModelType.RECURRING_RUN,
output_format,
)
@recurring_run.command()
-@click.argument('job-id')
+@click.argument('recurring-run-id')
@click.pass_context
-def get(ctx: click.Context, job_id: str):
+def get(ctx: click.Context, recurring_run_id: str):
"""Get information about a recurring run."""
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
- recurring_run = client_obj.get_recurring_run(job_id)
+ recurring_run = client_obj.get_recurring_run(recurring_run_id)
output.print_output(
recurring_run,
- output.ModelType.JOB,
+ output.ModelType.RECURRING_RUN,
output_format,
)
@recurring_run.command()
-@click.argument('job-id')
+@click.argument('recurring-run-id')
@click.pass_context
-def delete(ctx: click.Context, job_id: str):
+def delete(ctx: click.Context, recurring_run_id: str):
"""Delete a recurring run."""
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
- confirmation = f'Are you sure you want to delete job {job_id}?'
+ confirmation = f'Are you sure you want to delete job {recurring_run_id}?'
if not click.confirm(confirmation):
return
- client_obj.delete_job(job_id)
- output.print_deleted_text('job', job_id, output_format)
+ client_obj.delete_recurring_run(recurring_run_id)
+ output.print_deleted_text('recurring_run', recurring_run_id, output_format)
@recurring_run.command()
-@click.argument('job-id')
+@click.argument('recurring-run-id')
@click.pass_context
-def enable(ctx: click.Context, job_id: str):
+def enable(ctx: click.Context, recurring_run_id: str):
"""Enable a recurring run."""
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
- client_obj.enable_job(job_id=job_id)
+ client_obj.enable_recurring_run(recurring_run_id=recurring_run_id)
# TODO: add wait option, since enable takes time to complete
- recurring_run = client_obj.get_recurring_run(job_id=job_id)
+ recurring_run = client_obj.get_recurring_run(
+ recurring_run_id=recurring_run_id)
output.print_output(
recurring_run,
- output.ModelType.JOB,
+ output.ModelType.RECURRING_RUN,
output_format,
)
@recurring_run.command()
-@click.argument('job-id')
+@click.argument('recurring-run-id')
@click.pass_context
-def disable(ctx: click.Context, job_id: str):
+def disable(ctx: click.Context, recurring_run_id: str):
"""Disable a recurring run."""
client_obj: client.Client = ctx.obj['client']
output_format = ctx.obj['output']
- client_obj.disable_job(job_id=job_id)
+ client_obj.disable_recurring_run(recurring_run_id=recurring_run_id)
# TODO: add wait option, since disable takes time to complete
- recurring_run = client_obj.get_recurring_run(job_id=job_id)
+ recurring_run = client_obj.get_recurring_run(
+ recurring_run_id=recurring_run_id)
output.print_output(
recurring_run,
- output.ModelType.JOB,
+ output.ModelType.RECURRING_RUN,
output_format,
)
diff --git a/sdk/python/kfp/cli/run.py b/sdk/python/kfp/cli/run.py
index 06b94251bef..5cc22208e42 100644
--- a/sdk/python/kfp/cli/run.py
+++ b/sdk/python/kfp/cli/run.py
@@ -11,9 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
-import shutil
-import subprocess
import sys
import time
from typing import List
@@ -133,21 +130,21 @@ def create(ctx: click.Context, experiment_name: str, run_name: str,
experiment = client_obj.create_experiment(experiment_name)
run = client_obj.run_pipeline(
- experiment_id=experiment.id,
+ experiment_id=experiment.experiment_id,
job_name=run_name,
pipeline_package_path=package_file,
params=arg_dict,
pipeline_id=pipeline_id,
version_id=version)
if timeout > 0:
- run_detail = client_obj.wait_for_run_completion(run.id, timeout)
+ run_detail = client_obj.wait_for_run_completion(run.run_id, timeout)
output.print_output(
- run_detail.run,
+ run_detail,
output.ModelType.RUN,
output_format,
)
else:
- display_run(client_obj, namespace, run.id, watch, output_format)
+ display_run(client_obj, run.run_id, watch, output_format)
@run.command()
@@ -175,7 +172,7 @@ def get(ctx: click.Context, watch: bool, detail: bool, run_id: str):
click.echo(
'The --detail/-d flag is deprecated. Please use --output=json instead.',
err=True)
- display_run(client_obj, namespace, run_id, watch, output_format)
+ display_run(client_obj, run_id, watch, output_format)
@run.command()
@@ -189,7 +186,7 @@ def archive(ctx: click.Context, run_id: str):
client_obj.archive_run(run_id=run_id)
run = client_obj.get_run(run_id=run_id)
output.print_output(
- run.run,
+ run,
output.ModelType.RUN,
output_format,
)
@@ -205,7 +202,7 @@ def unarchive(ctx: click.Context, run_id: str):
client_obj.unarchive_run(run_id=run_id)
run = client_obj.get_run(run_id=run_id)
output.print_output(
- run.run,
+ run,
output.ModelType.RUN,
output_format,
)
@@ -228,9 +225,9 @@ def delete(ctx: click.Context, run_id: str):
output.print_deleted_text('run', run_id, output_format)
-def display_run(client: client.Client, namespace: str, run_id: str, watch: bool,
+def display_run(client: client.Client, run_id: str, watch: bool,
output_format: str):
- run = client.get_run(run_id).run
+ run = client.get_run(run_id)
output.print_output(
run,
@@ -239,32 +236,12 @@ def display_run(client: client.Client, namespace: str, run_id: str, watch: bool,
)
if not watch:
return
- argo_path = shutil.which('argo')
- if not argo_path:
- raise RuntimeError(
- "argo isn't found in $PATH. It's necessary for watch. "
- "Please make sure it's installed and available. "
- 'Installation instructions be found here - '
- 'https://github.com/argoproj/argo-workflows/releases')
-
- argo_workflow_name = None
while True:
time.sleep(1)
run_detail = client.get_run(run_id)
- run = run_detail.run
- if run_detail.pipeline_runtime and run_detail.pipeline_runtime.workflow_manifest:
- manifest = json.loads(run_detail.pipeline_runtime.workflow_manifest)
- if manifest['metadata'] and manifest['metadata']['name']:
- argo_workflow_name = manifest['metadata']['name']
- break
- if run_detail.run.status in ['Succeeded', 'Skipped', 'Failed', 'Error']:
- click.echo(f'Run is finished with status {run_detail.run.status}.')
+ run = run_detail
+ if run_detail.state in [
+ 'SUCCEEDED', 'SKIPPED', 'FAILED', 'CANCELED', 'PAUSED'
+ ]:
+ click.echo(f'Run is finished with state {run_detail.state}.')
return
- if argo_workflow_name:
- subprocess.run(
- [argo_path, 'watch', argo_workflow_name, '-n', namespace])
- output.print_output(
- run,
- output.ModelType.RUN,
- output_format,
- )
diff --git a/sdk/python/kfp/client/client.py b/sdk/python/kfp/client/client.py
index 34a5170a95c..690ce5dd5e5 100644
--- a/sdk/python/kfp/client/client.py
+++ b/sdk/python/kfp/client/client.py
@@ -14,6 +14,7 @@
"""The SDK client for Kubeflow Pipelines API."""
import copy
+import dataclasses
import datetime
import json
import logging
@@ -37,13 +38,14 @@
# Operators on scalar values. Only applies to one of |int_value|,
# |long_value|, |string_value| or |timestamp_value|.
_FILTER_OPERATIONS = {
- 'UNKNOWN': 0,
'EQUALS': 1,
'NOT_EQUALS': 2,
'GREATER_THAN': 3,
'GREATER_THAN_EQUALS': 5,
'LESS_THAN': 6,
- 'LESS_THAN_EQUALS': 7
+ 'LESS_THAN_EQUALS': 7,
+ 'IN': 8,
+ 'IS_SUBSTRING': 9,
}
KF_PIPELINES_ENDPOINT_ENV = 'KF_PIPELINES_ENDPOINT'
@@ -55,22 +57,20 @@
KF_PIPELINES_APP_OAUTH2_CLIENT_SECRET_ENV = 'KF_PIPELINES_APP_OAUTH2_CLIENT_SECRET'
-class JobConfig:
-
- def __init__(
- self, spec: kfp_server_api.ApiPipelineSpec,
- resource_references: kfp_server_api.ApiResourceReference) -> None:
- self.spec = spec
- self.resource_references = resource_references
+@dataclasses.dataclass
+class _JobConfig:
+ pipeline_spec: dict
+ pipeline_version_reference: kfp_server_api.V2beta1PipelineVersionReference
+ runtime_config: kfp_server_api.V2beta1RuntimeConfig
class RunPipelineResult:
def __init__(self, client: 'Client',
- run_info: kfp_server_api.ApiRun) -> None:
+ run_info: kfp_server_api.V2beta1Run) -> None:
self._client = client
self.run_info = run_info
- self.run_id = run_info.id
+ self.run_id = run_info.run_id
def wait_for_run_completion(self, timeout=None):
timeout = timeout or datetime.timedelta.max
@@ -176,7 +176,8 @@ def __init__(
header_value=self._context_setting.get(
'client_authentication_header_value'))
_add_generated_apis(self, kfp_server_api, api_client)
- self._job_api = kfp_server_api.JobServiceApi(api_client)
+ self._recurring_run_api = kfp_server_api.RecurringRunServiceApi(
+ api_client)
self._run_api = kfp_server_api.RunServiceApi(api_client)
self._experiment_api = kfp_server_api.ExperimentServiceApi(api_client)
self._pipelines_api = kfp_server_api.PipelineServiceApi(api_client)
@@ -392,8 +393,9 @@ def set_user_namespace(self, namespace: str) -> None:
json.dump(self._context_setting, f)
def get_kfp_healthz(
- self,
- sleep_duration: int = 5) -> kfp_server_api.ApiGetHealthzResponse:
+ self,
+ sleep_duration: int = 5,
+ ) -> kfp_server_api.V2beta1GetHealthzResponse:
"""Gets healthz info for KFP deployment.
Args:
@@ -433,10 +435,11 @@ def get_user_namespace(self) -> str:
return self._context_setting['namespace']
def create_experiment(
- self,
- name: str,
- description: str = None,
- namespace: str = None) -> kfp_server_api.ApiExperiment:
+ self,
+ name: str,
+ description: str = None,
+ namespace: str = None,
+ ) -> kfp_server_api.V2beta1Experiment:
"""Creates a new experiment.
Args:
@@ -445,7 +448,7 @@ def create_experiment(
namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``.
Returns:
- ``ApiExperiment`` object.
+ ``V2beta1Experiment`` object.
"""
namespace = namespace or self.get_user_namespace()
experiment = None
@@ -460,21 +463,14 @@ def create_experiment(
if not experiment:
logging.info(f'Creating experiment {name}.')
- resource_references = []
- if namespace is not None:
- key = kfp_server_api.ApiResourceKey(
- id=namespace, type=kfp_server_api.ApiResourceType.NAMESPACE)
- reference = kfp_server_api.ApiResourceReference(
- key=key, relationship=kfp_server_api.ApiRelationship.OWNER)
- resource_references.append(reference)
-
- experiment = kfp_server_api.ApiExperiment(
- name=name,
+ experiment = kfp_server_api.V2beta1Experiment(
+ display_name=name,
description=description,
- resource_references=resource_references)
+ namespace=namespace,
+ )
experiment = self._experiment_api.create_experiment(body=experiment)
- link = f'{self._get_url_prefix()}/#/experiments/details/{experiment.id}'
+ link = f'{self._get_url_prefix()}/#/experiments/details/{experiment.experiment_id}'
if self._is_ipython():
import IPython
html = f'Experiment details.'
@@ -495,8 +491,8 @@ def get_pipeline_id(self, name: str) -> Optional[str]:
"""
pipeline_filter = json.dumps({
'predicates': [{
- 'op': _FILTER_OPERATIONS['EQUALS'],
- 'key': 'name',
+ 'operation': _FILTER_OPERATIONS['EQUALS'],
+ 'key': 'display_name',
'stringValue': name,
}]
})
@@ -504,7 +500,7 @@ def get_pipeline_id(self, name: str) -> Optional[str]:
if result.pipelines is None:
return None
if len(result.pipelines) == 1:
- return result.pipelines[0].id
+ return result.pipelines[0].pipeline_id
elif len(result.pipelines) > 1:
raise ValueError(
f'Multiple pipelines with the name: {name} found, the name needs to be unique.'
@@ -517,46 +513,46 @@ def list_experiments(
page_size: int = 10,
sort_by: str = '',
namespace: Optional[str] = None,
- filter: Optional[str] = None
- ) -> kfp_server_api.ApiListExperimentsResponse:
+ filter: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1ListExperimentsResponse:
"""Lists experiments.
Args:
page_token: Page token for obtaining page from paginated response.
page_size: Size of the page.
- sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'name desc'``.
+ sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'display_name desc'``.
namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``.
filter: A url-encoded, JSON-serialized Filter protocol buffer
- (see `filter.proto message `_). For a list of all filter operations ``'op'``, see `here `_. Example:
+ (see `filter.proto message `_). Example:
::
json.dumps({
"predicates": [{
- "op": _FILTER_OPERATIONS["EQUALS"],
- "key": "name",
+ "operation": "EQUALS",
+ "key": "display_name",
"stringValue": "my-name",
}]
})
Returns:
- ``ApiListExperimentsResponse`` object.
+ ``V2beta1ListExperimentsResponse`` object.
"""
namespace = namespace or self.get_user_namespace()
- return self._experiment_api.list_experiment(
+ return self._experiment_api.list_experiments(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
- resource_reference_key_type=kfp_server_api.ApiResourceType
- .NAMESPACE,
- resource_reference_key_id=namespace,
- filter=filter)
+ filter=filter,
+ namespace=namespace,
+ )
def get_experiment(
- self,
- experiment_id: Optional[str] = None,
- experiment_name: Optional[str] = None,
- namespace: Optional[str] = None) -> kfp_server_api.ApiExperiment:
+ self,
+ experiment_id: Optional[str] = None,
+ experiment_name: Optional[str] = None,
+ namespace: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1Experiment:
"""Gets details of an experiment.
Either ``experiment_id`` or ``experiment_name`` is required.
@@ -564,32 +560,31 @@ def get_experiment(
Args:
experiment_id: ID of the experiment.
experiment_name: Name of the experiment.
- namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``.
+ namespace: Kubernetes namespace to use. Used for multi-user deployments.
+ For single-user deployments, this should be left as ``None``.
Returns:
- ``ApiExperiment`` object.
+ ``V2beta1Experiment`` object.
"""
namespace = namespace or self.get_user_namespace()
if experiment_id is None and experiment_name is None:
raise ValueError(
'Either experiment_id or experiment_name is required.')
if experiment_id is not None:
- return self._experiment_api.get_experiment(id=experiment_id)
+ return self._experiment_api.get_experiment(
+ experiment_id=experiment_id)
experiment_filter = json.dumps({
'predicates': [{
- 'op': _FILTER_OPERATIONS['EQUALS'],
- 'key': 'name',
+ 'operation': _FILTER_OPERATIONS['EQUALS'],
+ 'key': 'display_name',
'stringValue': experiment_name,
}]
})
if namespace is not None:
- result = self._experiment_api.list_experiment(
- filter=experiment_filter,
- resource_reference_key_type=kfp_server_api.ApiResourceType
- .NAMESPACE,
- resource_reference_key_id=namespace)
+ result = self._experiment_api.list_experiments(
+ filter=experiment_filter, namespace=namespace)
else:
- result = self._experiment_api.list_experiment(
+ result = self._experiment_api.list_experiments(
filter=experiment_filter)
if not result.experiments:
raise ValueError(
@@ -608,7 +603,8 @@ def archive_experiment(self, experiment_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._experiment_api.archive_experiment(id=experiment_id)
+ return self._experiment_api.archive_experiment(
+ experiment_id=experiment_id)
def unarchive_experiment(self, experiment_id: str) -> dict:
"""Unarchives an experiment.
@@ -619,7 +615,8 @@ def unarchive_experiment(self, experiment_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._experiment_api.unarchive_experiment(id=experiment_id)
+ return self._experiment_api.unarchive_experiment(
+ experiment_id=experiment_id)
def delete_experiment(self, experiment_id: str) -> dict:
"""Delete experiment.
@@ -630,7 +627,8 @@ def delete_experiment(self, experiment_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._experiment_api.delete_experiment(id=experiment_id)
+ return self._experiment_api.delete_experiment(
+ experiment_id=experiment_id)
def _extract_pipeline_yaml(self, package_file: str) -> dict:
@@ -672,8 +670,11 @@ def _choose_pipeline_file(file_list: List[str]) -> str:
f'The package_file {package_file} should end with one of the '
'following formats: [.tar.gz, .tgz, .zip, .yaml, .yml].')
- def _override_caching_options(self, pipeline_obj: dict,
- enable_caching: bool) -> None:
+ def _override_caching_options(
+ self,
+ pipeline_obj: dict,
+ enable_caching: bool,
+ ) -> None:
"""Overrides caching options.
Args:
@@ -691,31 +692,33 @@ def list_pipelines(
page_token: str = '',
page_size: int = 10,
sort_by: str = '',
- filter: Optional[str] = None
- ) -> kfp_server_api.ApiListPipelinesResponse:
+ filter: Optional[str] = None,
+ namespace: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1ListPipelinesResponse:
"""Lists pipelines.
Args:
page_token: Page token for obtaining page from paginated response.
page_size: Size of the page.
- sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'name desc'``.
+ sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'display_name desc'``.
filter: A url-encoded, JSON-serialized Filter protocol buffer
- (see `filter.proto message `_). For a list of all filter operations ``'op'``, see `here `_. Example:
+ (see `filter.proto message `_). Example:
::
json.dumps({
"predicates": [{
- "op": _FILTER_OPERATIONS["EQUALS"],
- "key": "name",
+ "operation": "EQUALS",
+ "key": "display_name",
"stringValue": "my-name",
}]
})
Returns:
- ``ApiListPipelinesResponse`` object.
+ ``V2beta1ListPipelinesResponse`` object.
"""
return self._pipelines_api.list_pipelines(
+ namespace=namespace,
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
@@ -733,7 +736,7 @@ def run_pipeline(
pipeline_root: Optional[str] = None,
enable_caching: Optional[bool] = None,
service_account: Optional[str] = None,
- ) -> kfp_server_api.ApiRun:
+ ) -> kfp_server_api.V2beta1Run:
"""Runs a specified pipeline.
Args:
@@ -759,13 +762,9 @@ def run_pipeline(
account to use for this run.
Returns:
- ``ApiRun`` object.
+ ``V2beta1Run`` object.
"""
- if params is None:
- params = {}
-
job_config = self._create_job_config(
- experiment_id=experiment_id,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
@@ -773,15 +772,18 @@ def run_pipeline(
enable_caching=enable_caching,
pipeline_root=pipeline_root,
)
- run_body = kfp_server_api.ApiRun(
- pipeline_spec=job_config.spec,
- resource_references=job_config.resource_references,
- name=job_name,
+
+ run_body = kfp_server_api.V2beta1Run(
+ experiment_id=experiment_id,
+ display_name=job_name,
+ pipeline_spec=job_config.pipeline_spec,
+ pipeline_version_reference=job_config.pipeline_version_reference,
+ runtime_config=job_config.runtime_config,
service_account=service_account)
response = self._run_api.create_run(body=run_body)
- link = f'{self._get_url_prefix()}/#/runs/details/{response.run.id}'
+ link = f'{self._get_url_prefix()}/#/runs/details/{response.run_id}'
if self._is_ipython():
import IPython
html = (f'Run details.')
@@ -789,7 +791,7 @@ def run_pipeline(
else:
print(f'Run details: {link}')
- return response.run
+ return response
def archive_run(self, run_id: str) -> dict:
"""Archives a run.
@@ -800,7 +802,7 @@ def archive_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._run_api.archive_run(id=run_id)
+ return self._run_api.archive_run(run_id=run_id)
def unarchive_run(self, run_id: str) -> dict:
"""Restores an archived run.
@@ -811,7 +813,7 @@ def unarchive_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._run_api.unarchive_run(id=run_id)
+ return self._run_api.unarchive_run(run_id=run_id)
def delete_run(self, run_id: str) -> dict:
"""Deletes a run.
@@ -822,7 +824,7 @@ def delete_run(self, run_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._run_api.delete_run(id=run_id)
+ return self._run_api.delete_run(run_id=run_id)
def terminate_run(self, run_id: str) -> dict:
"""Terminates a run.
@@ -854,7 +856,7 @@ def create_recurring_run(
pipeline_root: Optional[str] = None,
enable_caching: Optional[bool] = None,
service_account: Optional[str] = None,
- ) -> kfp_server_api.ApiJob:
+ ) -> kfp_server_api.V2beta1RecurringRun:
"""Creates a recurring run.
Args:
@@ -900,11 +902,10 @@ def create_recurring_run(
service_account: Specifies which Kubernetes service
account this recurring run uses.
Returns:
- ``ApiJob`` object.
+ ``V2beta1RecurringRun`` object.
"""
job_config = self._create_job_config(
- experiment_id=experiment_id,
params=params,
pipeline_package_path=pipeline_package_path,
pipeline_id=pipeline_id,
@@ -918,44 +919,48 @@ def create_recurring_run(
raise ValueError(
'Either interval_second or cron_expression is required.')
if interval_second is not None:
- trigger = kfp_server_api.ApiTrigger(
- periodic_schedule=kfp_server_api.ApiPeriodicSchedule(
+ trigger = kfp_server_api.V2beta1Trigger(
+ periodic_schedule=kfp_server_api.V2beta1PeriodicSchedule(
start_time=start_time,
end_time=end_time,
interval_second=interval_second))
if cron_expression is not None:
- trigger = kfp_server_api.ApiTrigger(
- cron_schedule=kfp_server_api.ApiCronSchedule(
+ trigger = kfp_server_api.V2beta1Trigger(
+ cron_schedule=kfp_server_api.V2beta1CronSchedule(
start_time=start_time,
end_time=end_time,
cron=cron_expression))
- job_body = kfp_server_api.ApiJob(
- enabled=enabled,
- pipeline_spec=job_config.spec,
- resource_references=job_config.resource_references,
- name=job_name,
+ mode = kfp_server_api.RecurringRunMode.DISABLE
+ if enabled:
+ mode = kfp_server_api.RecurringRunMode.ENABLE
+
+ job_body = kfp_server_api.V2beta1RecurringRun(
+ experiment_id=experiment_id,
+ mode=mode,
+ pipeline_spec=job_config.pipeline_spec,
+ pipeline_version_reference=job_config.pipeline_version_reference,
+ runtime_config=job_config.runtime_config,
+ display_name=job_name,
description=description,
no_catchup=no_catchup,
trigger=trigger,
max_concurrency=max_concurrency,
service_account=service_account)
- return self._job_api.create_job(body=job_body)
+ return self._recurring_run_api.create_recurring_run(body=job_body)
def _create_job_config(
self,
- experiment_id: str,
params: Optional[Dict[str, Any]],
pipeline_package_path: Optional[str],
pipeline_id: Optional[str],
version_id: Optional[str],
enable_caching: Optional[bool],
pipeline_root: Optional[str],
- ) -> JobConfig:
+ ) -> _JobConfig:
"""Creates a JobConfig with spec and resource_references.
Args:
- experiment_id: ID of an experiment.
pipeline_package_path: Local path of the pipeline package (the
filename should end with one of the following .tar.gz, .tgz,
.zip, .yaml, .yml).
@@ -974,11 +979,24 @@ def _create_job_config(
pipeline_root: Root path of the pipeline outputs.
Returns:
- A JobConfig object with attributes .spec and .resource_reference.
+ A _JobConfig object with attributes .pipeline_spec,
+ .pipeline_version_reference, and .runtime_config.
"""
+ from_spec = pipeline_package_path is not None
+ from_template = pipeline_id is not None or version_id is not None
+ if from_spec == from_template:
+ raise ValueError(
+ 'Must specify either `pipeline_pacakge_path` or both `pipeline_id` and `version_id`.'
+ )
+ if (pipeline_id is None) != (version_id is None):
+ raise ValueError(
+ 'To run a pipeline from an existing template, both `pipeline_id` and `version_id` are required.'
+ )
+
+ if params is None:
+ params = {}
- params = params or {}
- pipeline_yaml_string = None
+ pipeline_obj = None
if pipeline_package_path:
pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
@@ -987,33 +1005,20 @@ def _create_job_config(
if enable_caching is not None:
self._override_caching_options(pipeline_obj, enable_caching)
- pipeline_yaml_string = yaml.dump(pipeline_obj, sort_keys=True)
+ pipeline_version_reference = None
+ if pipeline_id is not None and version_id is not None:
+ pipeline_version_reference = kfp_server_api.V2beta1PipelineVersionReference(
+ pipeline_id=pipeline_id, pipeline_version_id=version_id)
- runtime_config = kfp_server_api.PipelineSpecRuntimeConfig(
+ runtime_config = kfp_server_api.V2beta1RuntimeConfig(
pipeline_root=pipeline_root,
parameters=params,
)
- resource_references = []
- key = kfp_server_api.ApiResourceKey(
- id=experiment_id, type=kfp_server_api.ApiResourceType.EXPERIMENT)
- reference = kfp_server_api.ApiResourceReference(
- key=key, relationship=kfp_server_api.ApiRelationship.OWNER)
- resource_references.append(reference)
-
- if version_id:
- key = kfp_server_api.ApiResourceKey(
- id=version_id,
- type=kfp_server_api.ApiResourceType.PIPELINE_VERSION)
- reference = kfp_server_api.ApiResourceReference(
- key=key, relationship=kfp_server_api.ApiRelationship.CREATOR)
- resource_references.append(reference)
-
- spec = kfp_server_api.ApiPipelineSpec(
- pipeline_id=pipeline_id,
- pipeline_manifest=pipeline_yaml_string,
+ return _JobConfig(
+ pipeline_spec=pipeline_obj,
+ pipeline_version_reference=pipeline_version_reference,
runtime_config=runtime_config,
)
- return JobConfig(spec=spec, resource_references=resource_references)
def create_run_from_pipeline_func(
self,
@@ -1133,7 +1138,7 @@ def create_run_from_pipeline_package(
experiment_name = overridden_experiment_name or 'Default'
experiment = self.create_experiment(
name=experiment_name, namespace=namespace)
- experiment_id = experiment.id
+ experiment_id = experiment.experiment_id
run_name = run_name or (
pipeline_name + ' ' +
@@ -1159,7 +1164,24 @@ def delete_job(self, job_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._job_api.delete_job(id=job_id)
+ warnings.warn(
+ '`delete_job` is deprecated. Please use `delete_recurring_run` instead.'
+ f'\nReroute to calling `delete_recurring_run(recurring_run_id="{job_id}")`',
+ category=DeprecationWarning,
+ stacklevel=2)
+ return self.delete_recurring_run(recurring_run_id=job_id)
+
+ def delete_recurring_run(self, recurring_run_id: str) -> dict:
+ """Deletes a recurring run.
+
+ Args:
+ recurring_run_id: ID of the recurring_run.
+
+ Returns:
+ Empty dictionary.
+ """
+ return self._recurring_run_api.delete_recurring_run(
+ recurring_run_id=recurring_run_id)
def disable_job(self, job_id: str) -> dict:
"""Disables a job (recurring run).
@@ -1170,7 +1192,24 @@ def disable_job(self, job_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._job_api.disable_job(id=job_id)
+ warnings.warn(
+ '`disable_job` is deprecated. Please use `disable_recurring_run` instead.'
+ f'\nReroute to calling `disable_recurring_run(recurring_run_id="{job_id}")`',
+ category=DeprecationWarning,
+ stacklevel=2)
+ return self.disable_recurring_run(recurring_run_id=job_id)
+
+ def disable_recurring_run(self, recurring_run_id: str) -> dict:
+ """Disables a recurring run.
+
+ Args:
+ recurring_run_id: ID of the recurring_run.
+
+ Returns:
+ Empty dictionary.
+ """
+ return self._recurring_run_api.disable_recurring_run(
+ recurring_run_id=recurring_run_id)
def enable_job(self, job_id: str) -> dict:
"""Enables a job (recurring run).
@@ -1181,39 +1220,57 @@ def enable_job(self, job_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._job_api.enable_job(id=job_id)
+ warnings.warn(
+ '`enable_job` is deprecated. Please use `enable_recurring_run` instead.'
+ f'\nReroute to calling `enable_recurring_run(recurring_run_id="{job_id}")`',
+ category=DeprecationWarning,
+ stacklevel=2)
+ return self.enable_recurring_run(recurring_run_id=job_id)
+
+ def enable_recurring_run(self, recurring_run_id: str) -> dict:
+ """Enables a recurring run.
+
+ Args:
+ recurring_run_id: ID of the recurring_run.
+
+ Returns:
+ Empty dictionary.
+ """
+ return self._recurring_run_api.enable_recurring_run(
+ recurring_run_id=recurring_run_id)
def list_runs(
- self,
- page_token: str = '',
- page_size: int = 10,
- sort_by: str = '',
- experiment_id: Optional[str] = None,
- namespace: Optional[str] = None,
- filter: Optional[str] = None) -> kfp_server_api.ApiListRunsResponse:
+ self,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = '',
+ experiment_id: Optional[str] = None,
+ namespace: Optional[str] = None,
+ filter: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1ListRunsResponse:
"""List runs.
Args:
page_token: Page token for obtaining page from paginated response.
page_size: Size of the page.
- sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'name desc'``.
+ sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'display_name desc'``.
experiment_id: Experiment ID to filter upon
namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``.
filter: A url-encoded, JSON-serialized Filter protocol buffer
- (see `filter.proto message `_). For a list of all filter operations ``'op'``, see `here `_. Example:
+ (see `filter.proto message `_). For a list of all filter operations ``'opertion'``, see `here `_. Example:
::
json.dumps({
"predicates": [{
- "op": _FILTER_OPERATIONS["EQUALS"],
- "key": "name",
+ "operation": "EQUALS",
+ "key": "display_name",
"stringValue": "my-name",
}]
})
Returns:
- ``ApiListRunsResponse`` object.
+ ``V2beta1ListRunsResponse`` object.
"""
namespace = namespace or self.get_user_namespace()
if experiment_id is not None:
@@ -1221,9 +1278,7 @@ def list_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
- resource_reference_key_type=kfp_server_api.ApiResourceType
- .EXPERIMENT,
- resource_reference_key_id=experiment_id,
+ experiment_id=experiment_id,
filter=filter)
elif namespace is not None:
@@ -1231,9 +1286,7 @@ def list_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
- resource_reference_key_type=kfp_server_api.ApiResourceType
- .NAMESPACE,
- resource_reference_key_id=namespace,
+ namespace=namespace,
filter=filter)
else:
@@ -1244,79 +1297,102 @@ def list_runs(
filter=filter)
def list_recurring_runs(
- self,
- page_token: str = '',
- page_size: int = 10,
- sort_by: str = '',
- experiment_id: Optional[str] = None,
- filter: Optional[str] = None) -> kfp_server_api.ApiListJobsResponse:
+ self,
+ page_token: str = '',
+ page_size: int = 10,
+ sort_by: str = '',
+ experiment_id: Optional[str] = None,
+ namespace: Optional[str] = None,
+ filter: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1ListRecurringRunsResponse:
"""Lists recurring runs.
Args:
page_token: Page token for obtaining page from paginated response.
page_size: Size of the page.
- sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'name desc'``.
+ sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'display_name desc'``.
experiment_id: Experiment ID to filter upon.
+ namespace: Kubernetes namespace to use. Used for multi-user deployments. For single-user deployments, this should be left as ``None``.
filter: A url-encoded, JSON-serialized Filter protocol buffer
- (see `filter.proto message `_). For a list of all filter operations ``'op'``, see `here `_. Example:
+ (see `filter.proto message `_). Example:
::
json.dumps({
"predicates": [{
- "op": _FILTER_OPERATIONS["EQUALS"],
- "key": "name",
+ "operation": "EQUALS",
+ "key": "display_name",
"stringValue": "my-name",
}]
})
Returns:
- ``ApiListJobsResponse`` object.
+ ``V2beta1ListRecurringRunsResponse`` object.
"""
if experiment_id is not None:
- return self._job_api.list_jobs(
+ return self._recurring_run_api.list_recurring_runs(
+ page_token=page_token,
+ page_size=page_size,
+ sort_by=sort_by,
+ experiment_id=experiment_id,
+ filter=filter)
+
+ elif namespace is not None:
+ return self._recurring_run_api.list_recurring_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
- resource_reference_key_type=kfp_server_api.ApiResourceType
- .EXPERIMENT,
- resource_reference_key_id=experiment_id,
+ namespace=namespace,
filter=filter)
else:
- return self._job_api.list_jobs(
+ return self._recurring_run_api.list_recurring_runs(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
filter=filter)
- def get_recurring_run(self, job_id: str) -> kfp_server_api.ApiJob:
- """Gets recurring run (job) details.
+ def get_recurring_run(
+ self,
+ recurring_run_id: str,
+ job_id: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1RecurringRun:
+ """Gets recurring run details.
Args:
- job_id: ID of the recurring run (job).
+ recurring_run_id: ID of the recurring run.
+ job_id: Deprecated. Use `recurring_run_id` instead.
Returns:
- ``ApiJob`` object.
+ ``V2beta1RecurringRun`` object.
"""
- return self._job_api.get_job(id=job_id)
+ if job_id is not None:
+ warnings.warn(
+ '`job_id` is deprecated. Please use `recurring_run_id` instead.',
+ category=DeprecationWarning,
+ stacklevel=2)
+ recurring_run_id = recurring_run_id or job_id
+
+ return self._recurring_run_api.get_recurring_run(
+ recurring_run_id=recurring_run_id)
- def get_run(self, run_id: str) -> kfp_server_api.ApiRun:
+ def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run:
"""Gets run details.
Args:
run_id: ID of the run.
Returns:
- ``ApiRun`` object.
+ ``V2beta1Run`` object.
"""
return self._run_api.get_run(run_id=run_id)
def wait_for_run_completion(
- self,
- run_id: str,
- timeout: int,
- sleep_duration: int = 5) -> kfp_server_api.ApiRun:
+ self,
+ run_id: str,
+ timeout: int,
+ sleep_duration: int = 5,
+ ) -> kfp_server_api.V2beta1Run:
"""Waits for a run to complete.
Args:
@@ -1325,14 +1401,14 @@ def wait_for_run_completion(
sleep_duration: Time in seconds between retries.
Returns:
- ``ApiRun`` object.
+ ``V2beta1Run`` object.
"""
- status = 'Running:'
+ state = 'Running:'
start_time = datetime.datetime.now()
if isinstance(timeout, datetime.timedelta):
timeout = timeout.total_seconds()
is_valid_token = False
- while (status is None or status.lower()
+ while (state is None or state.lower()
not in ['succeeded', 'failed', 'skipped', 'error']):
try:
get_run_response = self._run_api.get_run(run_id=run_id)
@@ -1346,7 +1422,7 @@ def wait_for_run_completion(
continue
else:
raise api_ex
- status = get_run_response.run.status
+ state = get_run_response.state
elapsed_time = (datetime.datetime.now() -
start_time).total_seconds()
logging.info('Waiting for the job to complete...')
@@ -1355,43 +1431,36 @@ def wait_for_run_completion(
time.sleep(sleep_duration)
return get_run_response
- def _get_workflow_json(self, run_id: str) -> dict:
- """Gets the workflow json.
-
- Args:
- run_id: run id, returned from run_pipeline.
-
- Returns:
- Workflow JSON.
- """
- get_run_response = self._run_api.get_run(run_id=run_id)
- workflow = get_run_response.pipeline_runtime.workflow_manifest
- return json.loads(workflow)
-
def upload_pipeline(
self,
pipeline_package_path: str,
pipeline_name: Optional[str] = None,
description: Optional[str] = None,
- ) -> kfp_server_api.ApiPipeline:
+ namespace: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1Pipeline:
"""Uploads a pipeline.
Args:
pipeline_package_path: Local path to the pipeline package.
pipeline_name: Name of the pipeline to be shown in the UI.
- description: Description of the pipeline to be shown in
- the UI.
+ description: Description of the pipeline to be shown in the UI.
+ namespace: Optional. Kubernetes namespace where the pipeline should
+ be uploaded. For single user deployment, leave it as None; For
+ multi user, input a namespace where the user is authorized.
Returns:
- ``ApiPipeline`` object.
+ ``V2beta1Pipeline`` object.
"""
if pipeline_name is None:
pipeline_yaml = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_name = pipeline_yaml['pipelineInfo']['name']
validate_pipeline_resource_name(pipeline_name)
response = self._upload_api.upload_pipeline(
- pipeline_package_path, name=pipeline_name, description=description)
- link = f'{self._get_url_prefix()}/#/pipelines/details/{response.id}'
+ pipeline_package_path,
+ name=pipeline_name,
+ description=description,
+ namespace=namespace)
+ link = f'{self._get_url_prefix()}/#/pipelines/details/{response.pipeline_id}'
if self._is_ipython():
import IPython
html = f'Pipeline details.'
@@ -1408,7 +1477,7 @@ def upload_pipeline_version(
pipeline_id: Optional[str] = None,
pipeline_name: Optional[str] = None,
description: Optional[str] = None,
- ) -> kfp_server_api.ApiPipelineVersion:
+ ) -> kfp_server_api.V2beta1PipelineVersion:
"""Uploads a new version of the pipeline.
Args:
@@ -1420,7 +1489,7 @@ def upload_pipeline_version(
description: Description of the pipeline version to show in the UI.
Returns:
- ``ApiPipelineVersion`` object.
+ ``V2beta1PipelineVersion`` object.
"""
if all([pipeline_id, pipeline_name
@@ -1440,7 +1509,7 @@ def upload_pipeline_version(
response = self._upload_api.upload_pipeline_version(
pipeline_package_path, **kwargs)
- link = f'{self._get_url_prefix()}/#/pipelines/details/{response.id}'
+ link = f'{self._get_url_prefix()}/#/pipelines/details/{response.pipeline_id}/version/{response.pipeline_version_id}'
if self._is_ipython():
import IPython
html = f'Pipeline details.'
@@ -1450,16 +1519,16 @@ def upload_pipeline_version(
return response
- def get_pipeline(self, pipeline_id: str) -> kfp_server_api.ApiPipeline:
+ def get_pipeline(self, pipeline_id: str) -> kfp_server_api.V2beta1Pipeline:
"""Gets pipeline details.
Args:
pipeline_id: ID of the pipeline.
Returns:
- ``ApiPipeline`` object.
+ ``V2beta1Pipeline`` object.
"""
- return self._pipelines_api.get_pipeline(id=pipeline_id)
+ return self._pipelines_api.get_pipeline(pipeline_id=pipeline_id)
def delete_pipeline(self, pipeline_id: str) -> dict:
"""Deletes a pipeline.
@@ -1470,7 +1539,7 @@ def delete_pipeline(self, pipeline_id: str) -> dict:
Returns:
Empty dictionary.
"""
- return self._pipelines_api.delete_pipeline(id=pipeline_id)
+ return self._pipelines_api.delete_pipeline(pipeline_id=pipeline_id)
def list_pipeline_versions(
self,
@@ -1478,67 +1547,83 @@ def list_pipeline_versions(
page_token: str = '',
page_size: int = 10,
sort_by: str = '',
- filter: Optional[str] = None
- ) -> kfp_server_api.ApiListPipelineVersionsResponse:
+ filter: Optional[str] = None,
+ ) -> kfp_server_api.V2beta1ListPipelineVersionsResponse:
"""Lists pipeline versions.
Args:
pipeline_id: ID of the pipeline for which to list versions.
page_token: Page token for obtaining page from paginated response.
page_size: Size of the page.
- sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'name desc'``.
+ sort_by: Sort string of format ``'[field_name]', '[field_name] desc'``. For example, ``'display_name desc'``.
filter: A url-encoded, JSON-serialized Filter protocol buffer
- (see `filter.proto message `_). For a list of all filter operations ``'op'``, see `here `_. Example:
+ (see `filter.proto message `_). Example:
::
json.dumps({
"predicates": [{
- "op": _FILTER_OPERATIONS["EQUALS"],
- "key": "name",
+ "operation": "EQUALS",
+ "key": "display_name",
"stringValue": "my-name",
}]
})
Returns:
- ``ApiListPipelineVersionsResponse`` object.
+ ``V2beta1ListPipelineVersionsResponse`` object.
"""
return self._pipelines_api.list_pipeline_versions(
page_token=page_token,
page_size=page_size,
sort_by=sort_by,
- resource_key_type=kfp_server_api.ApiResourceType.PIPELINE,
- resource_key_id=pipeline_id,
+ pipeline_id=pipeline_id,
filter=filter)
def get_pipeline_version(
- self, version_id: str) -> kfp_server_api.ApiPipelineVersion:
+ self,
+ pipeline_id: str,
+ pipeline_version_id: str,
+ ) -> kfp_server_api.V2beta1PipelineVersion:
"""Gets a pipeline version.
Args:
- version_id: ID of the pipeline version.
+ pipeline_id: ID of the pipeline.
+ pipeline_version_id: ID of the pipeline version.
Returns:
- ``ApiPipelineVersion`` object.
+ ``V2beta1PipelineVersion`` object.
"""
- return self._pipelines_api.get_pipeline_version(version_id=version_id)
+ return self._pipelines_api.get_pipeline_version(
+ pipeline_id=pipeline_id,
+ pipeline_version_id=pipeline_version_id,
+ )
- def delete_pipeline_version(self, version_id: str) -> dict:
- """Deletes a pipeline version.
+ def delete_pipeline_version(
+ self,
+ pipeline_id: str,
+ pipeline_version_id: str,
+ ) -> dict:
+ """Deletes a pipeline version.p.
Args:
- version_id: ID of the pipeline version.
+ pipeline_id: ID of the pipeline.
+ pipeline_version_id: ID of the pipeline version.
Returns:
Empty dictionary.
"""
return self._pipelines_api.delete_pipeline_version(
- version_id=version_id)
+ pipeline_id=pipeline_id,
+ pipeline_version_id=pipeline_version_id,
+ )
-def _add_generated_apis(target_struct: Any, api_module: ModuleType,
- api_client: kfp_server_api.ApiClient) -> None:
+def _add_generated_apis(
+ target_struct: Any,
+ api_module: ModuleType,
+ api_client: kfp_server_api.ApiClient,
+) -> None:
"""Initializes a hierarchical API object based on the generated API module.
PipelineServiceApi.create_pipeline becomes
diff --git a/sdk/python/kfp/client/client_test.py b/sdk/python/kfp/client/client_test.py
index 1b17e762377..8a735d01dbd 100644
--- a/sdk/python/kfp/client/client_test.py
+++ b/sdk/python/kfp/client/client_test.py
@@ -74,7 +74,7 @@ def pipeline_hello_world(text: str = 'hi there'):
with open(temp_filepath, 'r') as f:
pipeline_obj = yaml.safe_load(f)
- test_client = client.Client(namespace='foo')
+ test_client = client.Client(namespace='ns1')
test_client._override_caching_options(pipeline_obj, False)
for _, task in pipeline_obj['root']['dag']['tasks'].items():
self.assertFalse(task['cachingOptions']['enableCache'])
@@ -106,7 +106,7 @@ def pipeline_with_two_component(text: str = 'hi there'):
with open(temp_filepath, 'r') as f:
pipeline_obj = yaml.safe_load(f)
- test_client = client.Client(namespace='foo')
+ test_client = client.Client(namespace='ns1')
test_client._override_caching_options(pipeline_obj, False)
self.assertFalse(
pipeline_obj['root']['dag']['tasks']['hello-word']
@@ -118,7 +118,7 @@ def pipeline_with_two_component(text: str = 'hi there'):
class TestClient(unittest.TestCase):
def setUp(self):
- self.client = client.Client(namespace='foo')
+ self.client = client.Client(namespace='ns1')
def test__is_ipython_return_false(self):
mock = MagicMock()
@@ -152,9 +152,9 @@ def test_wait_for_run_completion_expired_access_token(self):
with patch.object(self.client._run_api, 'get_run') as mock_get_run:
# We need to iterate through multiple side effects in order to test this logic.
mock_get_run.side_effect = [
- Mock(run=Mock(status='foo')),
+ Mock(state='unknown state'),
kfp_server_api.ApiException(status=401),
- Mock(run=Mock(status='succeeded')),
+ Mock(state='succeeded'),
]
with patch.object(self.client, '_refresh_api_client_token'
@@ -166,7 +166,7 @@ def test_wait_for_run_completion_expired_access_token(self):
def test_wait_for_run_completion_valid_token(self):
with patch.object(self.client._run_api, 'get_run') as mock_get_run:
- mock_get_run.return_value = Mock(run=Mock(status='succeeded'))
+ mock_get_run.return_value = Mock(state='succeeded')
response = self.client.wait_for_run_completion(
run_id='foo', timeout=1, sleep_duration=0)
mock_get_run.assert_called_once_with(run_id='foo')
@@ -184,9 +184,9 @@ def test_wait_for_run_completion_run_timeout_should_raise_error(self):
def test_create_experiment_no_experiment_should_raise_error(
self, mock_get_experiment):
with self.assertRaises(ValueError):
- self.client.create_experiment(name='foo', namespace='foo')
+ self.client.create_experiment(name='foo', namespace='ns1')
mock_get_experiment.assert_called_once_with(
- name='foo', namespace='foo')
+ name='foo', namespace='ns1')
@patch('kfp.Client.get_experiment', return_value=Mock(id='foo'))
@patch('kfp.Client._get_url_prefix', return_value='/pipeline')
@@ -194,10 +194,10 @@ def test_create_experiment_existing_experiment(self, mock_get_url_prefix,
mock_get_experiment):
self.client.create_experiment(name='foo')
mock_get_experiment.assert_called_once_with(
- experiment_name='foo', namespace='foo')
+ experiment_name='foo', namespace='ns1')
mock_get_url_prefix.assert_called_once()
- @patch('kfp_server_api.ApiExperiment')
+ @patch('kfp_server_api.V2beta1Experiment')
@patch(
'kfp.Client.get_experiment',
side_effect=ValueError('No experiment is found with name'))
@@ -210,10 +210,11 @@ def test__create_experiment_name_not_found(self, mock_get_url_prefix,
with patch.object(
self.client._experiment_api,
'create_experiment',
- return_value=Mock(id='foo')) as mock_create_experiment:
+ return_value=Mock(
+ experiment_id='foo')) as mock_create_experiment:
self.client.create_experiment(name='foo')
mock_get_experiment.assert_called_once_with(
- experiment_name='foo', namespace='foo')
+ experiment_name='foo', namespace='ns1')
mock_api_experiment.assert_called_once()
mock_create_experiment.assert_called_once()
mock_get_url_prefix.assert_called_once()
@@ -228,11 +229,11 @@ def test_get_experiment_does_not_exist_should_raise_error(
with self.assertRaises(ValueError):
with patch.object(
self.client._experiment_api,
- 'list_experiment',
+ 'list_experiments',
return_value=Mock(
- experiments=None)) as mock_list_experiment:
+ experiments=None)) as mock_list_experiments:
self.client.get_experiment(experiment_name='foo')
- mock_list_experiment.assert_called_once()
+ mock_list_experiments.assert_called_once()
mock_get_user_namespace.assert_called_once()
@patch('kfp.Client.get_user_namespace', return_value=None)
@@ -241,32 +242,32 @@ def test_get_experiment_multiple_experiments_with_name_should_raise_error(
with self.assertRaises(ValueError):
with patch.object(
self.client._experiment_api,
- 'list_experiment',
+ 'list_experiments',
return_value=Mock(
- experiments=['foo', 'foo'])) as mock_list_experiment:
+ experiments=['foo', 'foo'])) as mock_list_experiments:
self.client.get_experiment(experiment_name='foo')
- mock_list_experiment.assert_called_once()
+ mock_list_experiments.assert_called_once()
mock_get_user_namespace.assert_called_once()
def test_get_experiment_with_experiment_id(self):
with patch.object(self.client._experiment_api,
'get_experiment') as mock_get_experiment:
self.client.get_experiment(experiment_id='foo')
- mock_get_experiment.assert_called_once_with(id='foo')
+ mock_get_experiment.assert_called_once_with(experiment_id='foo')
def test_get_experiment_with_experiment_name_and_namespace(self):
with patch.object(self.client._experiment_api,
- 'list_experiment') as mock_list_experiment:
- self.client.get_experiment(experiment_name='foo', namespace='foo')
- mock_list_experiment.assert_called_once()
+ 'list_experiments') as mock_list_experiments:
+ self.client.get_experiment(experiment_name='foo', namespace='ns1')
+ mock_list_experiments.assert_called_once()
@patch('kfp.Client.get_user_namespace', return_value=None)
def test_get_experiment_with_experiment_name_and_no_namespace(
self, mock_get_user_namespace):
with patch.object(self.client._experiment_api,
- 'list_experiment') as mock_list_experiment:
+ 'list_experiments') as mock_list_experiments:
self.client.get_experiment(experiment_name='foo')
- mock_list_experiment.assert_called_once()
+ mock_list_experiments.assert_called_once()
mock_get_user_namespace.assert_called_once()
@patch('kfp_server_api.HealthzServiceApi.get_healthz')
@@ -304,11 +305,13 @@ def pipeline_test_upload_without_name(boolean: bool = True):
package_path=pipeline_test_path)
self.client.upload_pipeline(
pipeline_package_path=pipeline_test_path,
- description='description')
+ description='description',
+ namespace='ns1')
mock_upload_pipeline.assert_called_once_with(
pipeline_test_path,
name='test-upload-without-name',
- description='description')
+ description='description',
+ namespace='ns1')
def test_upload_pipeline_with_name(self):
with patch.object(self.client._upload_api,
@@ -317,11 +320,13 @@ def test_upload_pipeline_with_name(self):
self.client.upload_pipeline(
pipeline_package_path='fake.yaml',
pipeline_name='overwritten-name',
- description='description')
+ description='description',
+ namespace='ns1')
mock_upload_pipeline.assert_called_once_with(
'fake.yaml',
name='overwritten-name',
- description='description')
+ description='description',
+ namespace='ns1')
if __name__ == '__main__':
diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in
index c1bf3df8e51..ff4a8fbf83a 100644
--- a/sdk/python/requirements.in
+++ b/sdk/python/requirements.in
@@ -16,7 +16,7 @@ kfp-pipeline-spec==0.2.1
# kfp-server-api package is released.
# Update the lower version when kfp sdk depends on new apis/fields in
# kfp-server-api.
-kfp-server-api==2.0.0a6
+kfp-server-api==2.0.0b1
kubernetes>=8.0.0,<24
protobuf>=3.13.0,<4
PyYAML>=5.3,<7
diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt
index 9690c000e22..8800074aefd 100644
--- a/sdk/python/requirements.txt
+++ b/sdk/python/requirements.txt
@@ -43,7 +43,7 @@ idna==3.3
# via requests
kfp-pipeline-spec==0.2.0
# via -r requirements.in
-kfp-server-api==2.0.0a6
+kfp-server-api==2.0.0b1
# via -r requirements.in
kubernetes==23.6.0
# via -r requirements.in