Skip to content

Commit

Permalink
Merge pull request #140 from TommyLike/bug/opt_life_cycle
Browse files Browse the repository at this point in the history
Fix combine volume feature issues&Adding testcase
  • Loading branch information
volcano-sh-bot authored May 8, 2019
2 parents d59fc36 + f7f2dee commit 7ecec77
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 119 deletions.
8 changes: 4 additions & 4 deletions example/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ spec:
policies:
- event: PodEvicted
action: RestartJob
input:
mountPath: "/myinput"
output:
mountPath: "/myoutput"
volumes:
- mountPath: "/myinput"
- mountPath: "/myoutput"
volumeClaimName: "testvolumeclaimname"
volumeClaim:
accessModes: [ "ReadWriteOnce" ]
storageClassName: "my-storage-class"
Expand Down
43 changes: 17 additions & 26 deletions installer/chart/volcano/templates/batch_v1alpha1_job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,27 @@ spec:
description: Specification of the desired behavior of a cron job, including
the minAvailable
properties:
input:
description: The volume mount for input of Job
properties:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
required:
- mountPath
type: object
volumes:
description: The volumes for Job
items:
properties:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should be mounted.
Must not contain ':'.
type: string
volumeClaimName:
description: The name of the volume claim.
type: object
required:
- mountPath
type: array
minAvailable:
description: The minimal available pods to run for this Job
format: int32
type: integer
output:
description: The volume mount for output of Job
properties:
volumeClaim:
description: VolumeClaim defines the PVC used by the VolumeMount.
type: object
mountPath:
description: Path within the container at which the volume should
be mounted. Must not contain ':'.
type: string
required:
- mountPath
type: object
policies:
description: Specifies the default lifecycle of tasks
items:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type JobEvent string
const (
CommandIssued JobEvent = "CommandIssued"
PluginError JobEvent = "PluginError"
PVCError JobEvent = "PVCError"
PodGroupError JobEvent = "PodGroupError"
)

// Event represent the phase of Job, e.g. pod-failed.
Expand Down
14 changes: 9 additions & 5 deletions pkg/controllers/job/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package helpers

import (
"fmt"
"k8s.io/api/core/v1"
"math/rand"
"strings"
"time"

v1 "k8s.io/api/core/v1"
)

const (
PodNameFmt = "%s-%s-%d"
PodNameFmt = "%s-%s-%d"
VolumeClaimFmt = "%s-volume-%s"
)

func GetTaskIndex(pod *v1.Pod) string {
Expand All @@ -42,13 +42,17 @@ func MakePodName(jobName string, taskName string, index int) string {
return fmt.Sprintf(PodNameFmt, jobName, taskName, index)
}

func GenRandomStr(l int) string {
func genRandomStr(l int) string {
str := "0123456789abcdefghijklmnopqrstuvwxyz"
bytes := []byte(str)
result := []byte{}
var result []byte
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < l; i++ {
result = append(result, bytes[r.Intn(len(bytes))])
}
return string(result)
}

func MakeVolumeClaimName(jobName string) string {
return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12))
}
134 changes: 51 additions & 83 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,40 +138,45 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return nil
}

func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateStatusFn) error {
func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if update, err := cc.filljob(job); err != nil || update {
return err
}

if err := cc.pluginOnJobAdd(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return err
}

if err := cc.createPodGroupIfNotExist(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return err
}

if err := cc.createJobIOIfNotExist(job); err != nil {
err, job := cc.createJobIOIfNotExist(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
return err
}

if updateStatus != nil {
updateStatus(&job.Status)
}

if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
} else {
if e := cc.cache.Update(job); e != nil {
if err := cc.cache.Update(job); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
job.Namespace, job.Name, e)
return e
job.Namespace, job.Name, err)
return err
}
}

Expand Down Expand Up @@ -329,70 +334,61 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return nil
}

func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 {
if current == 0 {
current += 1
}
if bumpVersion {
current += 1
}
return current
}

func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (error, *vkv1.Job) {
// If PVC does not exist, create them for Job.
var needUpdate, nameExist bool
volumes := job.Spec.Volumes
for _, volume := range volumes {
for index, volume := range volumes {
nameExist = false
vcName := volume.VolumeClaimName
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err
if len(vcName) == 0 {
//NOTE(k82cn): Ensure never have duplicated generated names.
for {
vcName = vkjobhelpers.MakeVolumeClaimName(job.Name)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err, nil
}
if exist {
continue
}
job.Spec.Volumes[index].VolumeClaimName = vcName
needUpdate = true
break
}
} else {
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return err, nil
}
nameExist = exist
}
if !exist {

if !nameExist {
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
if volume.VolumeClaim != nil {
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
return err
return err, nil
}
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
} else {
job.Status.ControlledResources["volume-emptyDir-"+vcName] = vcName
}
}
}
return nil
}

func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) {
// If VolumeClaimName does not exist, generate them for Job.
var newJob *vkv1.Job
volumes := job.Spec.Volumes
update := false
for index, volume := range volumes {
vcName := volume.VolumeClaimName
if len(vcName) == 0 {
for {
randomStr := vkjobhelpers.GenRandomStr(12)
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
exist, err := cc.checkPVCExist(job, vcName)
if err != nil {
return false, nil, err
}
if exist {
continue
}
if newJob == nil {
newJob = job.DeepCopy()
}
newJob.Spec.Volumes[index].VolumeClaimName = vcName
update = true
break
}
if needUpdate {
newJob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job)
if err != nil {
glog.Errorf("Failed to update Job %v/%v for volume claim name: %v ",
job.Namespace, job.Name, err)
return err, nil
} else {
return nil, newJob
}
}
return update, newJob, nil
return nil, job
}

func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
Expand Down Expand Up @@ -506,31 +502,3 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {

return minAvailableTasksRes.Convert2K8sResource()
}

func (cc *Controller) filljob(job *vkv1.Job) (bool, error) {
update, newJob, err := cc.needUpdateForVolumeClaim(job)
if err != nil {
return false, err
}
if update {
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
glog.Errorf("Failed to update Job %v/%v: %v",
job.Namespace, job.Name, err)
return false, err
}
return true, nil
} else if job.Status.State.Phase == "" {
job.Status.State.Phase = vkv1.Pending
if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
} else {
if e := cc.cache.Update(j); e != nil {
glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e)
}
}
return true, nil
}

return false, nil
}
4 changes: 3 additions & 1 deletion pkg/controllers/job/state/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
status.State.Phase = phase
})
default:
return CreateJob(ps.job, nil)
return CreateJob(ps.job, func(status *vkv1.JobStatus) {
status.State.Phase = vkv1.Pending
})
}
}
71 changes: 71 additions & 0 deletions test/e2e/job_controlled_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

var _ = Describe("Job E2E Test: Test Job PVCs", func() {
It("Generate PVC name if not specified", func() {
jobName := "job-pvc-name-empty"
namespace := "test"
taskName := "task"
pvcName := "specifiedpvcname"
context := initTestContext()
defer cleanupTestContext(context)

job := createJob(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: 1,
rep: 1,
name: taskName,
},
},
volumes: []v1alpha1.VolumeSpec{
{
MountPath: "/mountone",
VolumeClaimName: pvcName,
},
{
MountPath: "/mounttwo",
},
},
})

err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())

job, err = context.vkclient.BatchV1alpha1().Jobs(namespace).Get(jobName, v1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

Expect(len(job.Spec.Volumes)).To(Equal(2),
"Two volumes should be created")
for _, volume := range job.Spec.Volumes {
Expect(volume.VolumeClaimName).Should(Or(ContainSubstring(jobName), Equal(pvcName)),
"PVC name should be generated for manually specified.")
}
})
})
Loading

0 comments on commit 7ecec77

Please sign in to comment.