Skip to content

Commit

Permalink
Support success policy for TFJob (#1165)
Browse files Browse the repository at this point in the history
* Support success policy for TFJob

Signed-off-by: terrytangyuan <[email protected]>

* Fix imports

Signed-off-by: terrytangyuan <[email protected]>

* Update generated code

Signed-off-by: terrytangyuan <[email protected]>

* Fix defaults_test.go

Signed-off-by: terrytangyuan <[email protected]>
  • Loading branch information
terrytangyuan authored Jun 1, 2020
1 parent 07baabf commit 2479837
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 2 deletions.
23 changes: 23 additions & 0 deletions pkg/apis/tensorflow/v1/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2020 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1

// SuccessPolicy is the success policy.
type SuccessPolicy string

const (
SuccessPolicyDefault SuccessPolicy = ""
SuccessPolicyAllWorkers SuccessPolicy = "AllWorkers"
)
5 changes: 5 additions & 0 deletions pkg/apis/tensorflow/v1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func SetDefaults_TFJob(tfjob *TFJob) {
running := common.CleanPodPolicyRunning
tfjob.Spec.CleanPodPolicy = &running
}
// Set default success policy to "".
if tfjob.Spec.SuccessPolicy == nil {
defaultPolicy := SuccessPolicyDefault
tfjob.Spec.SuccessPolicy = &defaultPolicy
}

// Update the key of TFReplicaSpecs to camel case.
setTypeNamesToCamelCase(tfjob)
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/tensorflow/v1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ func expectedTFJob(cleanPodPolicy common.CleanPodPolicy, restartPolicy common.Re
)
}

defaultSuccessPolicy := SuccessPolicyDefault

return &TFJob{
Spec: TFJobSpec{
SuccessPolicy: &defaultSuccessPolicy,
CleanPodPolicy: &cleanPodPolicy,
TFReplicaSpecs: map[TFReplicaType]*common.ReplicaSpec{
TFReplicaTypeWorker: &common.ReplicaSpec{
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/tensorflow/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/tensorflow/v1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@
"type": "integer",
"format": "int32"
},
"successPolicy": {
"description": "SuccessPolicy defines the policy to mark the TFJob as succeeded. Default to \"\", using the default rules.",
"type": "string"
},
"cleanPodPolicy": {
"description": "Defines the policy for cleaning up pods after the TFJob completes. Defaults to Running.",
"type": "string"
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/tensorflow/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type TFJobSpec struct {
// +optional
BackoffLimit *int32 `json:"backoffLimit,omitempty"`

// SuccessPolicy defines the policy to mark the TFJob as succeeded.
// Default to "", using the default rules.
// +optional
SuccessPolicy *SuccessPolicy `json:"successPolicy,omitempty"`

// Defines the policy for cleaning up pods after the TFJob completes.
// Defaults to Running.
// +optional
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/tensorflow/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/common/util/v1/testutil/tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ func NewTFJobWithEvaluator(worker, ps, evaluator int) *tfv1.TFJob {
return tfJob
}

func NewTFJobWithSuccessPolicy(worker, ps int, successPolicy tfv1.SuccessPolicy) *tfv1.TFJob {
tfJob := NewTFJob(worker, ps)
tfJob.Spec.SuccessPolicy = &successPolicy
return tfJob
}

func NewTFJob(worker, ps int) *tfv1.TFJob {
tfJob := &tfv1.TFJob{
TypeMeta: metav1.TypeMeta{
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller.v1/tensorflow/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ func (tc *TFController) updateStatusSingle(tfjob *tfv1.TFJob, rtype tfv1.TFRepli
}
} else {
if rtype == tfv1.TFReplicaTypeWorker {
// All workers are succeeded or worker 0 completed, leave a succeeded condition.
if expected == 0 || worker0Completed {
// Leave a succeeded condition for the following two cases:
// 1. If default success policy is used and worker 0 has completed.
// 2. If `SuccessPolicyAllWorkers` success policy is used and all workers are succeeded.
if expected == 0 || (worker0Completed && *tfjob.Spec.SuccessPolicy != tfv1.SuccessPolicyAllWorkers) {
msg := fmt.Sprintf("TFJob %s successfully completed.", tfjob.Name)
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobSucceededReason, msg)
if tfjob.Status.CompletionTime == nil {
Expand Down
48 changes: 48 additions & 0 deletions pkg/controller.v1/tensorflow/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,54 @@ func TestStatus(t *testing.T) {
worker0Completed: true,
expectedType: common.JobSucceeded,
},
testCase{
description: "(No chief worker, successPolicy: AllWorkers) worker-0 are succeeded, 3 workers are active",
tfJob: testutil.NewTFJobWithSuccessPolicy(4, 0, tfv1.SuccessPolicyAllWorkers),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 1,
expectedActiveWorker: 3,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
restart: false,
worker0Completed: true,
expectedType: common.JobRunning,
},
testCase{
description: "(No chief worker, successPolicy: AllWorkers) 4 workers are succeeded",
tfJob: testutil.NewTFJobWithSuccessPolicy(4, 0, tfv1.SuccessPolicyAllWorkers),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 0,
expectedSucceededWorker: 4,
expectedActiveWorker: 0,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
restart: false,
worker0Completed: true,
expectedType: common.JobSucceeded,
},
testCase{
description: "(No chief worker, successPolicy: AllWorkers) worker-0 is succeeded, 2 workers are running, 1 worker is failed",
tfJob: testutil.NewTFJobWithSuccessPolicy(4, 0, tfv1.SuccessPolicyAllWorkers),
expectedFailedPS: 0,
expectedSucceededPS: 0,
expectedActivePS: 0,
expectedFailedWorker: 1,
expectedSucceededWorker: 1,
expectedActiveWorker: 2,
expectedFailedChief: 0,
expectedSucceededChief: 0,
expectedActiveChief: 0,
restart: false,
worker0Completed: true,
expectedType: common.JobFailed,
},
testCase{
description: "Chief is running, workers are failed",
tfJob: testutil.NewTFJobWithChief(4, 2),
Expand Down

0 comments on commit 2479837

Please sign in to comment.