From 2228b72191d82c41d234f88973a9b687e4533f7b Mon Sep 17 00:00:00 2001 From: Jin Chi He Date: Thu, 2 Jan 2020 17:35:40 +0800 Subject: [PATCH] Add watch function for TFJob python Client API (#1122) --- pkg/apis/tensorflow/v1/openapi_generated.go | 2 +- pkg/apis/tensorflow/v1/types.go | 2 +- .../tensorflow/v1/zz_generated.deepcopy.go | 2 +- .../tensorflow/v1/zz_generated.defaults.go | 2 +- pkg/client/clientset/versioned/clientset.go | 2 +- pkg/client/clientset/versioned/doc.go | 2 +- .../versioned/fake/clientset_generated.go | 2 +- pkg/client/clientset/versioned/fake/doc.go | 2 +- .../clientset/versioned/fake/register.go | 2 +- pkg/client/clientset/versioned/scheme/doc.go | 2 +- .../clientset/versioned/scheme/register.go | 2 +- .../versioned/typed/tensorflow/v1/doc.go | 2 +- .../versioned/typed/tensorflow/v1/fake/doc.go | 2 +- .../v1/fake/fake_tensorflow_client.go | 2 +- .../typed/tensorflow/v1/fake/fake_tfjob.go | 2 +- .../tensorflow/v1/generated_expansion.go | 2 +- .../typed/tensorflow/v1/tensorflow_client.go | 2 +- .../versioned/typed/tensorflow/v1/tfjob.go | 2 +- .../informers/externalversions/factory.go | 2 +- .../informers/externalversions/generic.go | 2 +- .../internalinterfaces/factory_interfaces.go | 2 +- .../externalversions/tensorflow/interface.go | 2 +- .../tensorflow/v1/interface.go | 2 +- .../externalversions/tensorflow/v1/tfjob.go | 2 +- .../tensorflow/v1/expansion_generated.go | 2 +- pkg/client/listers/tensorflow/v1/tfjob.go | 2 +- sdk/python/docs/TFJobClient.md | 15 +- sdk/python/examples/kubeflow-tfjob-sdk.ipynb | 84 +++----- .../kubeflow/tfjob/api/tf_job_client.py | 183 ++++++++++-------- sdk/python/kubeflow/tfjob/api/tf_job_watch.py | 59 ++++++ sdk/python/requirements.txt | 1 + test/workflows/components/workflows.libsonnet | 10 +- 32 files changed, 229 insertions(+), 175 deletions(-) create mode 100644 sdk/python/kubeflow/tfjob/api/tf_job_watch.py diff --git a/pkg/apis/tensorflow/v1/openapi_generated.go b/pkg/apis/tensorflow/v1/openapi_generated.go index 944f7a79d1..acea0f7955 100644 --- a/pkg/apis/tensorflow/v1/openapi_generated.go +++ b/pkg/apis/tensorflow/v1/openapi_generated.go @@ -1,6 +1,6 @@ // +build !ignore_autogenerated -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/apis/tensorflow/v1/types.go b/pkg/apis/tensorflow/v1/types.go index 6d8ad28efb..9042be7ac3 100644 --- a/pkg/apis/tensorflow/v1/types.go +++ b/pkg/apis/tensorflow/v1/types.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/apis/tensorflow/v1/zz_generated.deepcopy.go b/pkg/apis/tensorflow/v1/zz_generated.deepcopy.go index dedb34a996..29f60522bb 100644 --- a/pkg/apis/tensorflow/v1/zz_generated.deepcopy.go +++ b/pkg/apis/tensorflow/v1/zz_generated.deepcopy.go @@ -1,6 +1,6 @@ // +build !ignore_autogenerated -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/apis/tensorflow/v1/zz_generated.defaults.go b/pkg/apis/tensorflow/v1/zz_generated.defaults.go index fc89223996..8ad59372bd 100644 --- a/pkg/apis/tensorflow/v1/zz_generated.defaults.go +++ b/pkg/apis/tensorflow/v1/zz_generated.defaults.go @@ -1,6 +1,6 @@ // +build !ignore_autogenerated -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/clientset.go b/pkg/client/clientset/versioned/clientset.go index cbca9321a7..8e67b71493 100644 --- a/pkg/client/clientset/versioned/clientset.go +++ b/pkg/client/clientset/versioned/clientset.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/doc.go b/pkg/client/clientset/versioned/doc.go index a311794354..a228caddfd 100644 --- a/pkg/client/clientset/versioned/doc.go +++ b/pkg/client/clientset/versioned/doc.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/fake/clientset_generated.go b/pkg/client/clientset/versioned/fake/clientset_generated.go index ecdbe0fc1d..16dd80f473 100644 --- a/pkg/client/clientset/versioned/fake/clientset_generated.go +++ b/pkg/client/clientset/versioned/fake/clientset_generated.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/fake/doc.go b/pkg/client/clientset/versioned/fake/doc.go index 8c0be5ac41..280810d6e3 100644 --- a/pkg/client/clientset/versioned/fake/doc.go +++ b/pkg/client/clientset/versioned/fake/doc.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/fake/register.go b/pkg/client/clientset/versioned/fake/register.go index 84578de7e6..adf1d485d6 100644 --- a/pkg/client/clientset/versioned/fake/register.go +++ b/pkg/client/clientset/versioned/fake/register.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/scheme/doc.go b/pkg/client/clientset/versioned/scheme/doc.go index d0f752890d..5a9f51a09a 100644 --- a/pkg/client/clientset/versioned/scheme/doc.go +++ b/pkg/client/clientset/versioned/scheme/doc.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/scheme/register.go b/pkg/client/clientset/versioned/scheme/register.go index ed8968ecba..aef4051115 100644 --- a/pkg/client/clientset/versioned/scheme/register.go +++ b/pkg/client/clientset/versioned/scheme/register.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/doc.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/doc.go index 5c8101df70..3a37e2c520 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/doc.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/doc.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/doc.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/doc.go index ce2fa8a9b1..e664a4c362 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/doc.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/doc.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tensorflow_client.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tensorflow_client.go index b5611341b4..7ce8105955 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tensorflow_client.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tensorflow_client.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tfjob.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tfjob.go index ab6dd06b05..fe4be260e7 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tfjob.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/fake/fake_tfjob.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/generated_expansion.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/generated_expansion.go index 98be8edca8..c9889ffe17 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/generated_expansion.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/tensorflow_client.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/tensorflow_client.go index 7a176883e7..e7b7630165 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/tensorflow_client.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/tensorflow_client.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/clientset/versioned/typed/tensorflow/v1/tfjob.go b/pkg/client/clientset/versioned/typed/tensorflow/v1/tfjob.go index b9f3d50576..6da647ed48 100644 --- a/pkg/client/clientset/versioned/typed/tensorflow/v1/tfjob.go +++ b/pkg/client/clientset/versioned/typed/tensorflow/v1/tfjob.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/informers/externalversions/factory.go b/pkg/client/informers/externalversions/factory.go index 52f2478afc..adfd59c1da 100644 --- a/pkg/client/informers/externalversions/factory.go +++ b/pkg/client/informers/externalversions/factory.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 47cc5c29d9..1892b806e6 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go b/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go index 892f7488af..0bffd841c2 100644 --- a/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go +++ b/pkg/client/informers/externalversions/internalinterfaces/factory_interfaces.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/informers/externalversions/tensorflow/interface.go b/pkg/client/informers/externalversions/tensorflow/interface.go index a897161eba..4db444d744 100644 --- a/pkg/client/informers/externalversions/tensorflow/interface.go +++ b/pkg/client/informers/externalversions/tensorflow/interface.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/informers/externalversions/tensorflow/v1/interface.go b/pkg/client/informers/externalversions/tensorflow/v1/interface.go index 3cb0c3fcec..d7f7c32098 100644 --- a/pkg/client/informers/externalversions/tensorflow/v1/interface.go +++ b/pkg/client/informers/externalversions/tensorflow/v1/interface.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/informers/externalversions/tensorflow/v1/tfjob.go b/pkg/client/informers/externalversions/tensorflow/v1/tfjob.go index 1406173d0a..b70b646776 100644 --- a/pkg/client/informers/externalversions/tensorflow/v1/tfjob.go +++ b/pkg/client/informers/externalversions/tensorflow/v1/tfjob.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/listers/tensorflow/v1/expansion_generated.go b/pkg/client/listers/tensorflow/v1/expansion_generated.go index fc0f0ce634..4083e1529f 100644 --- a/pkg/client/listers/tensorflow/v1/expansion_generated.go +++ b/pkg/client/listers/tensorflow/v1/expansion_generated.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/client/listers/tensorflow/v1/tfjob.go b/pkg/client/listers/tensorflow/v1/tfjob.go index 82b5432c66..462b446764 100644 --- a/pkg/client/listers/tensorflow/v1/tfjob.go +++ b/pkg/client/listers/tensorflow/v1/tfjob.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Kubeflow Authors +// Copyright 2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/sdk/python/docs/TFJobClient.md b/sdk/python/docs/TFJobClient.md index a043ad622b..150f9c1c85 100644 --- a/sdk/python/docs/TFJobClient.md +++ b/sdk/python/docs/TFJobClient.md @@ -96,7 +96,7 @@ namespace | str | Namespace for tfjob deploying to. If the `namespace` is not de object ## get -> get(name=None, namespace=None) +> get(name=None, namespace=None, watch=False, timeout_seconds=600) Get the created tfjob in the specified namespace @@ -114,7 +114,8 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- name | str | The TFJob name. If the `name` is not specified, it will get all tfjobs in the namespace.| Optional. | namespace | str | The tfjob's namespace. Defaults to current or default namespace.| Optional | - +watch | bool | Watch the created TFJob if `True`, otherwise will return the created TFJob object. Stop watching if TFJob reaches the optional specified `timeout_seconds` or once the TFJob status `Succeeded` or `Failed`. | Optional | +timeout_seconds | int | Timeout seconds for watching. Defaults to 600. | Optional | ### Return type object @@ -180,6 +181,7 @@ object > namespace=None, > timeout_seconds=600, > polling_interval=30, +> watch=False, > status_callback=None): Wait for the specified job to finish. @@ -191,6 +193,14 @@ from kubeflow.tfjob import TFJobClient tfjob_client = TFJobClient() tfjob_client.wait_for_job('mnist', namespace='kubeflow') + +# The API also supports watching the TFJob status till it's Succeeded or Failed. +tfjob_client.wait_for_job('mnist', namespace=namespace, watch=True) +NAME STATE TIME +mnist Created 2019-12-31T09:20:07Z +mnist Running 2019-12-31T09:20:19Z +mnist Running 2019-12-31T09:20:19Z +mnist Succeeded 2019-12-31T09:22:04Z ``` ### Parameters @@ -201,6 +211,7 @@ namespace | str | The tfjob's namespace. Defaults to current or default namespac timeout_seconds | int | How long to wait for the job, default wait for 600 seconds. | Optional| polling_interval | int | How often to poll for the status of the job.| Optional| status_callback | str | Callable. If supplied this callable is invoked after we poll the job. Callable takes a single argument which is the tfjob.| Optional| +watch | bool | Watch the TFJob if `True`. Stop watching if TFJob reaches the optional specified `timeout_seconds` or once the TFJob status `Succeeded` or `Failed`. | Optional | ### Return type object diff --git a/sdk/python/examples/kubeflow-tfjob-sdk.ipynb b/sdk/python/examples/kubeflow-tfjob-sdk.ipynb index e875cf457b..ff065835db 100644 --- a/sdk/python/examples/kubeflow-tfjob-sdk.ipynb +++ b/sdk/python/examples/kubeflow-tfjob-sdk.ipynb @@ -120,13 +120,13 @@ "text/plain": [ "{'apiVersion': 'kubeflow.org/v1',\n", " 'kind': 'TFJob',\n", - " 'metadata': {'creationTimestamp': '2019-12-17T05:40:26Z',\n", + " 'metadata': {'creationTimestamp': '2019-12-31T09:20:07Z',\n", " 'generation': 1,\n", " 'name': 'mnist',\n", " 'namespace': 'default',\n", - " 'resourceVersion': '13585452',\n", + " 'resourceVersion': '20125141',\n", " 'selfLink': '/apis/kubeflow.org/v1/namespaces/default/tfjobs/mnist',\n", - " 'uid': 'b9faefd7-208f-11ea-9e34-00000a1001ee'},\n", + " 'uid': 'bcb3b867-2bae-11ea-8c04-00000a1001ee'},\n", " 'spec': {'cleanPodPolicy': 'None',\n", " 'tfReplicaSpecs': {'Worker': {'replicas': 1,\n", " 'restartPolicy': 'Never',\n", @@ -166,13 +166,13 @@ "text/plain": [ "{'apiVersion': 'kubeflow.org/v1',\n", " 'kind': 'TFJob',\n", - " 'metadata': {'creationTimestamp': '2019-12-17T05:40:26Z',\n", + " 'metadata': {'creationTimestamp': '2019-12-31T09:20:07Z',\n", " 'generation': 1,\n", " 'name': 'mnist',\n", " 'namespace': 'default',\n", - " 'resourceVersion': '13585464',\n", + " 'resourceVersion': '20125155',\n", " 'selfLink': '/apis/kubeflow.org/v1/namespaces/default/tfjobs/mnist',\n", - " 'uid': 'b9faefd7-208f-11ea-9e34-00000a1001ee'},\n", + " 'uid': 'bcb3b867-2bae-11ea-8c04-00000a1001ee'},\n", " 'spec': {'cleanPodPolicy': 'None',\n", " 'tfReplicaSpecs': {'Worker': {'replicas': 1,\n", " 'restartPolicy': 'Never',\n", @@ -183,14 +183,14 @@ " '--batch_size=150'],\n", " 'image': 'gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0',\n", " 'name': 'tensorflow'}]}}}}},\n", - " 'status': {'conditions': [{'lastTransitionTime': '2019-12-17T05:40:26Z',\n", - " 'lastUpdateTime': '2019-12-17T05:40:26Z',\n", + " 'status': {'conditions': [{'lastTransitionTime': '2019-12-31T09:20:07Z',\n", + " 'lastUpdateTime': '2019-12-31T09:20:07Z',\n", " 'message': 'TFJob mnist is created.',\n", " 'reason': 'TFJobCreated',\n", " 'status': 'True',\n", " 'type': 'Created'}],\n", " 'replicaStatuses': {'Worker': {}},\n", - " 'startTime': '2019-12-17T05:40:26Z'}}" + " 'startTime': '2019-12-31T09:20:09Z'}}" ] }, "execution_count": 5, @@ -217,7 +217,7 @@ { "data": { "text/plain": [ - "'Running'" + "'Created'" ] }, "execution_count": 6, @@ -242,57 +242,19 @@ "metadata": {}, "outputs": [ { - "data": { - "text/plain": [ - "{'apiVersion': 'kubeflow.org/v1',\n", - " 'kind': 'TFJob',\n", - " 'metadata': {'creationTimestamp': '2019-12-17T05:40:26Z',\n", - " 'generation': 1,\n", - " 'name': 'mnist',\n", - " 'namespace': 'default',\n", - " 'resourceVersion': '13586024',\n", - " 'selfLink': '/apis/kubeflow.org/v1/namespaces/default/tfjobs/mnist',\n", - " 'uid': 'b9faefd7-208f-11ea-9e34-00000a1001ee'},\n", - " 'spec': {'cleanPodPolicy': 'None',\n", - " 'tfReplicaSpecs': {'Worker': {'replicas': 1,\n", - " 'restartPolicy': 'Never',\n", - " 'template': {'spec': {'containers': [{'command': ['python',\n", - " '/var/tf_mnist/mnist_with_summaries.py',\n", - " '--log_dir=/train/logs',\n", - " '--learning_rate=0.01',\n", - " '--batch_size=150'],\n", - " 'image': 'gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0',\n", - " 'name': 'tensorflow'}]}}}}},\n", - " 'status': {'completionTime': '2019-12-17T05:42:19Z',\n", - " 'conditions': [{'lastTransitionTime': '2019-12-17T05:40:26Z',\n", - " 'lastUpdateTime': '2019-12-17T05:40:26Z',\n", - " 'message': 'TFJob mnist is created.',\n", - " 'reason': 'TFJobCreated',\n", - " 'status': 'True',\n", - " 'type': 'Created'},\n", - " {'lastTransitionTime': '2019-12-17T05:40:36Z',\n", - " 'lastUpdateTime': '2019-12-17T05:40:36Z',\n", - " 'message': 'TFJob mnist is running.',\n", - " 'reason': 'TFJobRunning',\n", - " 'status': 'False',\n", - " 'type': 'Running'},\n", - " {'lastTransitionTime': '2019-12-17T05:42:19Z',\n", - " 'lastUpdateTime': '2019-12-17T05:42:19Z',\n", - " 'message': 'TFJob mnist successfully completed.',\n", - " 'reason': 'TFJobSucceeded',\n", - " 'status': 'True',\n", - " 'type': 'Succeeded'}],\n", - " 'replicaStatuses': {'Worker': {'succeeded': 1}},\n", - " 'startTime': '2019-12-17T05:40:26Z'}}" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" + "name": "stdout", + "output_type": "stream", + "text": [ + "NAME STATE TIME \n", + "mnist Created 2019-12-31T09:20:07Z \n", + "mnist Running 2019-12-31T09:20:19Z \n", + "mnist Running 2019-12-31T09:20:19Z \n", + "mnist Succeeded 2019-12-31T09:22:04Z \n" + ] } ], "source": [ - "tfjob_client.wait_for_job('mnist', namespace=namespace)" + "tfjob_client.wait_for_job('mnist', namespace=namespace, watch=True)" ] }, { @@ -305,7 +267,9 @@ { "cell_type": "code", "execution_count": 8, - "metadata": {}, + "metadata": { + "scrolled": true + }, "outputs": [ { "data": { @@ -344,7 +308,7 @@ " 'details': {'name': 'mnist',\n", " 'group': 'kubeflow.org',\n", " 'kind': 'tfjobs',\n", - " 'uid': 'b9faefd7-208f-11ea-9e34-00000a1001ee'}}" + " 'uid': 'bcb3b867-2bae-11ea-8c04-00000a1001ee'}}" ] }, "execution_count": 9, diff --git a/sdk/python/kubeflow/tfjob/api/tf_job_client.py b/sdk/python/kubeflow/tfjob/api/tf_job_client.py index f253d4f77a..454e59fc69 100644 --- a/sdk/python/kubeflow/tfjob/api/tf_job_client.py +++ b/sdk/python/kubeflow/tfjob/api/tf_job_client.py @@ -19,6 +19,7 @@ from kubeflow.tfjob.constants import constants from kubeflow.tfjob.utils import utils +from .tf_job_watch import watch as tfjob_watch class TFJobClient(object): @@ -68,62 +69,74 @@ def create(self, tfjob, namespace=None): return outputs - def get(self, name=None, namespace=None): + def get(self, name=None, namespace=None, watch=False, timeout_seconds=600): #pylint: disable=inconsistent-return-statements """ Get the tfjob - :param name: existing tfjob name + :param name: existing tfjob name, if not defined, the get all tfjobs in the namespace. :param namespace: defaults to current or default namespace + :param watch: Watch the TFJob if `True`. + :param timeout_seconds: How long to watch the job.. :return: tfjob """ if namespace is None: namespace = utils.get_default_target_namespace() if name: - thread = self.api_instance.get_namespaced_custom_object( - constants.TFJOB_GROUP, - constants.TFJOB_VERSION, - namespace, - constants.TFJOB_PLURAL, - name, - async_req=True) - - tfjobs = None - try: - tfjobs = thread.get(constants.APISERVER_TIMEOUT) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get TFJob.") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ - %s\n" % e) - except Exception as e: - raise RuntimeError( - "There was a problem to get TFJob {0} in namespace {1}. Exception: \ - {2} ".format(name, namespace, e)) - + if watch: + tfjob_watch( + name=name, + namespace=namespace, + timeout_seconds=timeout_seconds) + else: + thread = self.api_instance.get_namespaced_custom_object( + constants.TFJOB_GROUP, + constants.TFJOB_VERSION, + namespace, + constants.TFJOB_PLURAL, + name, + async_req=True) + + tfjob = None + try: + tfjob = thread.get(constants.APISERVER_TIMEOUT) + except multiprocessing.TimeoutError: + raise RuntimeError("Timeout trying to get TFJob.") + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->get_namespaced_custom_object:\ + %s\n" % e) + except Exception as e: + raise RuntimeError( + "There was a problem to get TFJob {0} in namespace {1}. Exception: \ + {2} ".format(name, namespace, e)) + return tfjob else: - thread = self.api_instance.list_namespaced_custom_object( - constants.TFJOB_GROUP, - constants.TFJOB_VERSION, - namespace, - constants.TFJOB_PLURAL, - async_req=True) - - tfjobs = None - try: - tfjobs = thread.get(constants.APISERVER_TIMEOUT) - except multiprocessing.TimeoutError: - raise RuntimeError("Timeout trying to get TFJob.") - except client.rest.ApiException as e: - raise RuntimeError( - "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ - %s\n" % e) - except Exception as e: - raise RuntimeError( - "There was a problem to List TFJob in namespace {0}. \ - Exception: {1} ".format(namespace, e)) - - return tfjobs + if watch: + tfjob_watch( + namespace=namespace, + timeout_seconds=timeout_seconds) + else: + thread = self.api_instance.list_namespaced_custom_object( + constants.TFJOB_GROUP, + constants.TFJOB_VERSION, + namespace, + constants.TFJOB_PLURAL, + async_req=True) + + tfjobs = None + try: + tfjobs = thread.get(constants.APISERVER_TIMEOUT) + except multiprocessing.TimeoutError: + raise RuntimeError("Timeout trying to get TFJob.") + except client.rest.ApiException as e: + raise RuntimeError( + "Exception when calling CustomObjectsApi->list_namespaced_custom_object:\ + %s\n" % e) + except Exception as e: + raise RuntimeError( + "There was a problem to list TFJobs in namespace {0}. \ + Exception: {1} ".format(namespace, e)) + return tfjobs def patch(self, name, tfjob, namespace=None): @@ -177,32 +190,40 @@ def delete(self, name, namespace=None): %s\n" % e) - def wait_for_job(self, name, + def wait_for_job(self, name, #pylint: disable=inconsistent-return-statements namespace=None, timeout_seconds=600, polling_interval=30, + watch=False, status_callback=None): """Wait for the specified job to finish. - Args: - name: Name of the TfJob. - namespace: defaults to current or default namespace. - timeout_seconds: How long to wait for the job. - polling_interval: How often to poll for the status of the job. - status_callback: (Optional): Callable. If supplied this callable is - invoked after we poll the job. Callable takes a single argument which - is the job. + :param name: Name of the TfJob. + :param namespace: defaults to current or default namespace. + :param timeout_seconds: How long to wait for the job. + :param polling_interval: How often to poll for the status of the job. + :param watch: Watch the TFJob if `True`. + :param status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + :return: """ if namespace is None: namespace = utils.get_default_target_namespace() - return self.wait_for_condition( - name, - ["Succeeded", "Failed"], - namespace=namespace, - timeout_seconds=timeout_seconds, - polling_interval=polling_interval, - status_callback=status_callback) + if watch: + tfjob_watch( + name=name, + namespace=namespace, + timeout_seconds=timeout_seconds) + else: + return self.wait_for_condition( + name, + ["Succeeded", "Failed"], + namespace=namespace, + timeout_seconds=timeout_seconds, + polling_interval=polling_interval, + status_callback=status_callback) def wait_for_condition(self, name, @@ -213,16 +234,16 @@ def wait_for_condition(self, name, status_callback=None): """Waits until any of the specified conditions occur. - Args: - name: Name of the job. - expected_condition: A list of conditions. Function waits until any of the - supplied conditions is reached. - namespace: defaults to current or default namespace. - timeout_seconds: How long to wait for the job. - polling_interval: How often to poll for the status of the job. - status_callback: (Optional): Callable. If supplied this callable is - invoked after we poll the job. Callable takes a single argument which - is the job. + :param name: Name of the job. + :param expected_condition: A list of conditions. Function waits until any of the + supplied conditions is reached. + :param namespace: defaults to current or default namespace. + :param timeout_seconds: How long to wait for the job. + :param polling_interval: How often to poll for the status of the job. + :param status_callback: (Optional): Callable. If supplied this callable is + invoked after we poll the job. Callable takes a single argument which + is the job. + :return: Object TFJob status """ if namespace is None: @@ -255,9 +276,9 @@ def wait_for_condition(self, name, def get_job_status(self, name, namespace=None): """Returns TFJob status, such as Running, Failed or Succeeded. - Args: - name: The TFJob name. - namespace: defaults to current or default namespace. + :param name: The TFJob name. + :param namespace: defaults to current or default namespace. + :return: Object TFJob status """ if namespace is None: namespace = utils.get_default_target_namespace() @@ -270,9 +291,9 @@ def get_job_status(self, name, namespace=None): def is_job_running(self, name, namespace=None): """Returns true if the TFJob running; false otherwise. - Args: - name: The TFJob name. - namespace: defaults to current or default namespace. + :param name: The TFJob name. + :param namespace: defaults to current or default namespace. + :return: True or False """ tfjob_status = self.get_job_status(name, namespace=namespace) return tfjob_status.lower() == "running" @@ -281,9 +302,9 @@ def is_job_running(self, name, namespace=None): def is_job_succeeded(self, name, namespace=None): """Returns true if the TFJob succeeded; false otherwise. - Args: - name: The TFJob name. - namespace: defaults to current or default namespace. + :param name: The TFJob name. + :param namespace: defaults to current or default namespace. + :return: True or False """ tfjob_status = self.get_job_status(name, namespace=namespace) return tfjob_status.lower() == "succeeded" diff --git a/sdk/python/kubeflow/tfjob/api/tf_job_watch.py b/sdk/python/kubeflow/tfjob/api/tf_job_watch.py new file mode 100644 index 0000000000..4e5f0781a2 --- /dev/null +++ b/sdk/python/kubeflow/tfjob/api/tf_job_watch.py @@ -0,0 +1,59 @@ +# Copyright 2019 The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import retrying +from kubernetes import client +from kubernetes import watch as k8s_watch +from table_logger import TableLogger + +from kubeflow.tfjob.constants import constants +from kubeflow.tfjob.utils import utils + +tbl = TableLogger( + columns='NAME,STATE,TIME', + colwidth={'NAME': 30, 'STATE':20, 'TIME':30}, + border=False) + +@retrying.retry(wait_fixed=1000, stop_max_attempt_number=20) +def watch(name=None, namespace=None, timeout_seconds=600): + """Watch the created or patched InferenceService in the specified namespace""" + + if namespace is None: + namespace = utils.get_default_target_namespace() + + stream = k8s_watch.Watch().stream( + client.CustomObjectsApi().list_namespaced_custom_object, + constants.TFJOB_GROUP, + constants.TFJOB_VERSION, + namespace, + constants.TFJOB_PLURAL, + timeout_seconds=timeout_seconds) + + for event in stream: + tfjob = event['object'] + tfjob_name = tfjob['metadata']['name'] + if name and name != tfjob_name: + continue + else: + status = '' + update_time = '' + last_condition = tfjob.get('status', {}).get('conditions', [])[-1] + status = last_condition.get('type', '') + update_time = last_condition.get('lastTransitionTime', '') + + tbl(tfjob_name, status, update_time) + + if name == tfjob_name: + if status == 'Succeeded' or status == 'Failed': + break diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index 27f8e2d7e2..e59cde657a 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -4,3 +4,4 @@ python_dateutil>=2.5.3 setuptools>=21.0.0 urllib3>=1.15.1 kubernetes>=10.0.1 +table_logger>=0.3.5 diff --git a/test/workflows/components/workflows.libsonnet b/test/workflows/components/workflows.libsonnet index 345d895e7f..b962ba0e4f 100644 --- a/test/workflows/components/workflows.libsonnet +++ b/test/workflows/components/workflows.libsonnet @@ -433,12 +433,10 @@ "--suffix=" + params.tfJobVersion, ]), // copy-artifacts $.parts(namespace, name, overrides).e2e(prow_env, bucket).buildTemplate("tfjob-sdk-tests", [ - "pytest", - "sdk/python/test", - "--log-cli-level=info", - "--log-cli-format='%(levelname)s|%(asctime)s|%(pathname)s|%(lineno)d| %(message)s'", - "--junitxml=" + artifactsDir + "/junit_sdk-test.xml", - ]), // copy-artifacts + "/bin/sh", + "-xc", + "pip3 install -r sdk/python/requirements.txt; pytest sdk/python/test --log-cli-level=info --log-cli-format='%(levelname)s|%(asctime)s|%(pathname)s|%(lineno)d| %(message)s' --junitxml=" + artifactsDir + "/junit_sdk-test.xml" + ]), // tfjob-sdk-tests ], // templates }, }, // e2e