From fa9fcc42bf839828762865fa50fe3bb081163c2e Mon Sep 17 00:00:00 2001 From: TommyLike Date: Sun, 5 May 2019 10:44:42 +0800 Subject: [PATCH] Combine input & output volumes --- pkg/admission/admit_job.go | 7 +- pkg/admission/mutate_job.go | 35 +-------- pkg/apis/batch/v1alpha1/job.go | 22 +++--- .../batch/v1alpha1/zz_generated.deepcopy.go | 15 ++-- pkg/controllers/job/helpers/helpers.go | 15 +++- pkg/controllers/job/job_controller_util.go | 77 ++++++------------- 6 files changed, 60 insertions(+), 111 deletions(-) diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index 41c664a4c49..93eea205962 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -144,10 +144,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st } } - //TODO(tommylikehu): Fix me and enable it. - //if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok { - // msg = msg + validateInfo - //} + if validateInfo, ok := ValidateIO(job.Spec.Volumes); ok { + msg = msg + validateInfo + } if msg != "" { reviewResponse.Allowed = false diff --git a/pkg/admission/mutate_job.go b/pkg/admission/mutate_job.go index 63d0c270a90..f7ed88f2c24 100644 --- a/pkg/admission/mutate_job.go +++ b/pkg/admission/mutate_job.go @@ -19,16 +19,13 @@ package admission import ( "encoding/json" "fmt" - "math/rand" - "strconv" - "time" - "github.com/golang/glog" + "strconv" "k8s.io/api/admission/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) type patchOperation struct { @@ -74,7 +71,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { func createPatch(job v1alpha1.Job) ([]byte, error) { var patch []patchOperation patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...) - patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...) return json.Marshal(patch) } @@ -95,30 +91,3 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat return patch } - -func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) { - if len(metadata.Annotations) == 0 { - metadata.Annotations = make(map[string]string) - } - randomStr := genRandomStr(5) - metadata.Annotations[PVCInputName] = fmt.Sprintf("%s-input-%s", metadata.Name, randomStr) - metadata.Annotations[PVCOutputName] = fmt.Sprintf("%s-output-%s", metadata.Name, randomStr) - patch = append(patch, patchOperation{ - Op: "replace", - Path: basePath, - Value: metadata, - }) - - return patch -} - -func genRandomStr(l int) string { - str := "0123456789abcdefghijklmnopqrstuvwxyz" - bytes := []byte(str) - result := []byte{} - r := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < l; i++ { - result = append(result, bytes[r.Intn(len(bytes))]) - } - return string(result) -} diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index f0d50c61761..84b4f576eb5 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -47,32 +47,29 @@ type JobSpec struct { // +optional MinAvailable int32 `json:"minAvailable,omitempty" protobuf:"bytes,2,opt,name=minAvailable"` - // The volume mount for input of Job - Input *VolumeSpec `json:"input,omitempty" protobuf:"bytes,3,opt,name=input"` - - // The volume mount for output of Job - Output *VolumeSpec `json:"output,omitempty" protobuf:"bytes,4,opt,name=output"` + // The volumes mount on Job + Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,3,opt,name=volumes"` // Tasks specifies the task specification of Job // +optional - Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"` + Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,4,opt,name=tasks"` // Specifies the default lifecycle of tasks // +optional - Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,6,opt,name=policies"` + Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"` // Specifies the plugin of job // Key is plugin name, value is the arguments of the plugin // +optional - Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,7,opt,name=plugins"` + Plugins map[string][]string `json:"plugins,omitempty" protobuf:"bytes,6,opt,name=plugins"` //Specifies the queue that will be used in the scheduler, "default" queue is used this leaves empty. - Queue string `json:"queue,omitempty" protobuf:"bytes,8,opt,name=queue"` + Queue string `json:"queue,omitempty" protobuf:"bytes,7,opt,name=queue"` // Specifies the maximum number of retries before marking this Job failed. // Defaults to 3. // +optional - MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,9,opt,name=maxRetry"` + MaxRetry int32 `json:"maxRetry,omitempty" protobuf:"bytes,8,opt,name=maxRetry"` } // VolumeSpec defines the specification of Volume, e.g. PVC @@ -81,8 +78,11 @@ type VolumeSpec struct { // not contain ':'. MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"` + // defined the PVC name + VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"` + // VolumeClaim defines the PVC used by the VolumeMount. - VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,1,opt,name=volumeClaim"` + VolumeClaim *v1.PersistentVolumeClaimSpec `json:"volumeClaim,omitempty" protobuf:"bytes,3,opt,name=volumeClaim"` } type JobEvent string diff --git a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go index 6b844540fbb..ab368e414e7 100644 --- a/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/batch/v1alpha1/zz_generated.deepcopy.go @@ -90,15 +90,12 @@ func (in *JobList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *JobSpec) DeepCopyInto(out *JobSpec) { *out = *in - if in.Input != nil { - in, out := &in.Input, &out.Input - *out = new(VolumeSpec) - (*in).DeepCopyInto(*out) - } - if in.Output != nil { - in, out := &in.Output, &out.Output - *out = new(VolumeSpec) - (*in).DeepCopyInto(*out) + if in.Volumes != nil { + in, out := &in.Volumes, &out.Volumes + *out = make([]VolumeSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.Tasks != nil { in, out := &in.Tasks, &out.Tasks diff --git a/pkg/controllers/job/helpers/helpers.go b/pkg/controllers/job/helpers/helpers.go index 93a42f5fba6..ca44a7cadc9 100644 --- a/pkg/controllers/job/helpers/helpers.go +++ b/pkg/controllers/job/helpers/helpers.go @@ -18,9 +18,11 @@ package helpers import ( "fmt" + "math/rand" "strings" + "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) const ( @@ -39,3 +41,14 @@ func GetTaskIndex(pod *v1.Pod) string { func MakePodName(jobName string, taskName string, index int) string { return fmt.Sprintf(PodNameFmt, jobName, taskName, index) } + +func GenRandomStr(l int) string { + str := "0123456789abcdefghijklmnopqrstuvwxyz" + bytes := []byte(str) + result := []byte{} + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < l; i++ { + result = append(result, bytes[r.Intn(len(bytes))]) + } + return string(result) +} diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 085c3a83325..cd863359986 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -21,12 +21,10 @@ import ( "github.com/golang/glog" - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - admissioncontroller "volcano.sh/volcano/pkg/admission" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/controllers/apis" @@ -54,7 +52,7 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: MakePodName(job.Name, template.Name, ix), + Name: vkjobhelpers.MakePodName(job.Name, template.Name, ix), Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(job, helpers.JobKind), @@ -70,61 +68,34 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { pod.Spec.SchedulerName = job.Spec.SchedulerName } - inputPVC := job.Annotations[admissioncontroller.PVCInputName] - outputPVC := job.Annotations[admissioncontroller.PVCOutputName] - if job.Spec.Output != nil { - if job.Spec.Output.VolumeClaim == nil { - volume := v1.Volume{ - Name: outputPVC, - } - volume.EmptyDir = &v1.EmptyDirVolumeSource{} - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } else { - volume := v1.Volume{ - Name: outputPVC, - } - volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: outputPVC, - } - - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } - - for i, c := range pod.Spec.Containers { - vm := v1.VolumeMount{ - MountPath: job.Spec.Output.MountPath, - Name: outputPVC, - } - pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) - } - } - - if job.Spec.Input != nil { - if job.Spec.Input.VolumeClaim == nil { - volume := v1.Volume{ - Name: inputPVC, - } - volume.EmptyDir = &v1.EmptyDirVolumeSource{} - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) - } else { - volume := v1.Volume{ - Name: inputPVC, - } - volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: inputPVC, + volumeMap := make(map[string]bool) + for _, volume := range job.Spec.Volumes { + vcName := volume.VolumeClaimName + if _, ok := volumeMap[vcName]; !ok { + if _, ok := job.Status.ControlledResources["volume-emptyDir-"+vcName]; ok && volume.VolumeClaim == nil { + volume := v1.Volume{ + Name: vcName, + } + volume.EmptyDir = &v1.EmptyDirVolumeSource{} + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + } else { + volume := v1.Volume{ + Name: vcName, + } + volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: vcName, + } + pod.Spec.Volumes = append(pod.Spec.Volumes, volume) } - - pod.Spec.Volumes = append(pod.Spec.Volumes, volume) + volumeMap[vcName] = true } for i, c := range pod.Spec.Containers { vm := v1.VolumeMount{ - MountPath: job.Spec.Input.MountPath, - Name: inputPVC, + MountPath: volume.MountPath, + Name: vcName, } - pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm) - } }