Skip to content

Commit

Permalink
remove inqueue job phase
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyuqing4 committed Aug 20, 2019
1 parent e2faed8 commit 9fc4c04
Show file tree
Hide file tree
Showing 16 changed files with 49 additions and 666 deletions.
2 changes: 0 additions & 2 deletions docs/design/job-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,6 @@ const (
Terminated JobPhase = "Terminated"
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
Failed JobPhase = "Failed"
// Inqueue is the phase that cluster have idle resource to schedule the job
Inqueue JobPhase = "Inqueue"
)

// JobState contains details for the current state of the job.
Expand Down
2 changes: 0 additions & 2 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ const (
Terminated JobPhase = "Terminated"
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
Failed JobPhase = "Failed"
// Inqueue is the phase that cluster have idle resource to schedule the job
Inqueue JobPhase = "Inqueue"
)

// JobState contains details for the current state of the job.
Expand Down
1 change: 0 additions & 1 deletion pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func NewJobController(
// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob
state.CreateJob = cc.createJob

return cc
}
Expand Down
43 changes: 13 additions & 30 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,58 +143,34 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
return nil
}

func (cc *Controller) createJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)

job := jobInfo.Job.DeepCopy()
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

func (cc *Controller) createJob(job *vkv1.Job) (*vkv1.Job, error) {
job, err := cc.initJobStatus(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.JobStatusError),
fmt.Sprintf("Failed to initialize job status, err: %v", err))
return err
return nil, err
}

if err := cc.pluginOnJobAdd(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return err
return nil, err
}

if err := cc.createPodGroupIfNotExist(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return err
return nil, err
}

newJob, err := cc.createJobIOIfNotExist(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError),
fmt.Sprintf("Failed to create PVC, err: %v", err))
return err
}

if updateStatus != nil {
if updateStatus(&newJob.Status) {
newJob.Status.State.LastTransitionTime = metav1.Now()
}
}

newJob2, err := cc.vkClients.BatchV1alpha1().Jobs(newJob.Namespace).UpdateStatus(newJob)
if err != nil {
glog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if err = cc.cache.Update(newJob2); err != nil {
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
newJob2.Namespace, newJob2.Name, err)
return err
return nil, err
}

return nil
return newJob, nil
}

func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
Expand All @@ -210,6 +186,11 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return nil
}

var err error
if job, err = cc.createJob(job); err != nil {
return err
}

var running, pending, terminating, succeeded, failed, unknown int32

var podToCreate []*v1.Pod
Expand Down Expand Up @@ -404,6 +385,8 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (*vkv1.Job, error) {
job.Namespace, job.Name, err)
return nil, err
}
newJob.Status = job.Status

return newJob, err
}
return job, nil
Expand Down
118 changes: 5 additions & 113 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,119 +165,6 @@ func TestKillJobFunc(t *testing.T) {
}
}

func TestCreateJobFunc(t *testing.T) {
namespace := "test"

testcases := []struct {
Name string
Job *v1alpha1.Job
PodGroup *schedulingv1alpha2.PodGroup
UpdateStatus state.UpdateStatusFn
JobInfo *apis.JobInfo
Plugins []string
ExpextVal error
}{
{
Name: "CreateJob success Case",
Job: &v1alpha1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &schedulingv1alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: "job1",
Namespace: namespace,
},
},
UpdateStatus: nil,
JobInfo: &apis.JobInfo{
Namespace: namespace,
Name: "jobinfo1",
},
Plugins: []string{"svc", "ssh", "env"},
ExpextVal: nil,
},
}

for i, testcase := range testcases {

t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()
jobPlugins := make(map[string][]string)

for _, plugin := range testcase.Plugins {
jobPlugins[plugin] = make([]string, 0)
}
testcase.JobInfo.Job = testcase.Job
testcase.JobInfo.Job.Spec.Plugins = jobPlugins

_, err := fakeController.vkClients.BatchV1alpha1().Jobs(namespace).Create(testcase.Job)
if err != nil {
t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error())
}

err = fakeController.cache.Add(testcase.Job)
if err != nil {
t.Error("Error While Adding Job in cache")
}

err = fakeController.createJob(testcase.JobInfo, testcase.UpdateStatus)
if err != nil {
t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error())
}

job, err := fakeController.vkClients.BatchV1alpha1().Jobs(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: No Error, but got error %s", i, testcase.Name, err.Error())
}
for _, plugin := range testcase.Plugins {

if plugin == "svc" {
_, err = fakeController.kubeClients.CoreV1().Services(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: Service to be created, but not created because of error %s", i, testcase.Name, err.Error())
}

_, err = fakeController.kubeClients.CoreV1().ConfigMaps(namespace).Get(fmt.Sprint(testcase.Job.Name, "-svc"), metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: Service to be created, but not created because of error %s", i, testcase.Name, err.Error())
}

exist := job.Status.ControlledResources["plugin-svc"]
if exist == "" {
t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name)
}
}

if plugin == "ssh" {
_, err := fakeController.kubeClients.CoreV1().ConfigMaps(namespace).Get(fmt.Sprint(testcase.Job.Name, "-ssh"), metav1.GetOptions{})
if err != nil {
t.Errorf("Case %d (%s): expected: ConfigMap to be created, but not created because of error %s", i, testcase.Name, err.Error())
}
exist := job.Status.ControlledResources["plugin-ssh"]
if exist == "" {
t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name)
}
}
if plugin == "env" {
exist := job.Status.ControlledResources["plugin-env"]
if exist == "" {
t.Errorf("Case %d (%s): expected: ControlledResources should be added, but not got added", i, testcase.Name)
}
}
}
})

}
}

func TestSyncJobFunc(t *testing.T) {
namespace := "test"

Expand Down Expand Up @@ -321,6 +208,11 @@ func TestSyncJobFunc(t *testing.T) {
},
},
},
Status: v1alpha1.JobStatus{
State: v1alpha1.JobState{
Phase: v1alpha1.Pending,
},
},
},
PodGroup: &schedulingv1alpha2.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
}

_, err := cc.cache.Get(vkcache.JobKeyByName(newPG.Namespace, newPG.Name))
if err != nil {
if err != nil && newPG.Annotations != nil {
glog.Warningf(
"Failed to find job in cache by PodGroup, this may not be a PodGroup for volcano job.")
}
Expand All @@ -423,8 +423,6 @@ func (cc *Controller) updatePodGroup(oldObj, newObj interface{}) {
switch newPG.Status.Phase {
case kbtype.PodGroupUnknown:
req.Event = vkbatchv1.JobUnknownEvent
case kbtype.PodGroupInqueue:
req.Action = vkbatchv1.EnqueueAction
}
key := vkjobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
Expand Down
Loading

0 comments on commit 9fc4c04

Please sign in to comment.