Skip to content

Commit

Permalink
Merge pull request #787 from hzxuzhonghu/elastic-2
Browse files Browse the repository at this point in the history
Implement job scale up and down
  • Loading branch information
volcano-sh-bot authored May 8, 2020
2 parents 52ad8d2 + 1a0f685 commit 2a495c5
Show file tree
Hide file tree
Showing 20 changed files with 312 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ spec:
description: The number that volcano retried to submit the job.
format: int32
type: integer
ControlledResources:
controlledResources:
description: All of the resources that are controlled by this job.
type: object
additionalProperties:
Expand Down
2 changes: 1 addition & 1 deletion installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ spec:
description: The number that volcano retried to submit the job.
format: int32
type: integer
ControlledResources:
controlledResources:
description: All of the resources that are controlled by this job.
type: object
additionalProperties:
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/bus/v1alpha1/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ const (

// Note: events below are used internally, should not be used by users.

// OutOfSyncEvent is triggered if Pod/Job were updated
// OutOfSyncEvent is triggered if Pod/Job is updated(add/update/delete)
OutOfSyncEvent Event = "OutOfSync"

// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"

// JobUpdatedEvent is triggered if Job is updated, currently only scale up/down
JobUpdatedEvent Event = "JobUpdated"
)
50 changes: 14 additions & 36 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"syscall"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
Expand All @@ -52,38 +50,10 @@ var CommandKind = vcbus.SchemeGroupVersion.WithKind("Command")
// V1beta1QueueKind is queue kind with v1alpha2 version
var V1beta1QueueKind = schedulerv1beta1.SchemeGroupVersion.WithKind("Queue")

// GetController returns the controller uid
func GetController(obj interface{}) types.UID {
accessor, err := meta.Accessor(obj)
if err != nil {
return ""
}

controllerRef := metav1.GetControllerOf(accessor)
if controllerRef != nil {
return controllerRef.UID
}

return ""
}

// ControlledBy controlled by
func ControlledBy(obj interface{}, gvk schema.GroupVersionKind) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
return false
}

controllerRef := metav1.GetControllerOf(accessor)
if controllerRef != nil {
return controllerRef.Kind == gvk.Kind
}

return false
}

// CreateConfigMapIfNotExist creates config map resource if not present
func CreateConfigMapIfNotExist(job *vcbatch.Job, kubeClients kubernetes.Interface, data map[string]string, cmName string) error {
// CreateOrUpdateConfigMap :
// 1. creates config map resource if not present
// 2. updates config map is necessary
func CreateOrUpdateConfigMap(job *vcbatch.Job, kubeClients kubernetes.Interface, data map[string]string, cmName string) error {
// If ConfigMap does not exist, create one for Job.
cmOld, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Get(cmName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -112,6 +82,11 @@ func CreateConfigMapIfNotExist(job *vcbatch.Job, kubeClients kubernetes.Interfac
return nil
}

// no changes
if reflect.DeepEqual(cmOld.Data, data) {
return nil
}

cmOld.Data = data
if _, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Update(cmOld); err != nil {
klog.V(3).Infof("Failed to update ConfigMap for Job <%s/%s>: %v",
Expand All @@ -136,6 +111,9 @@ func CreateSecret(job *vcbatch.Job, kubeClients kubernetes.Interface, data map[s
}

_, err := kubeClients.CoreV1().Secrets(job.Namespace).Create(secret)
if apierrors.IsAlreadyExists(err) {
return nil
}

return err
}
Expand Down
66 changes: 61 additions & 5 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (cc *Controller) initiateJob(job *batch.Job) (*batch.Job, error) {
return nil, err
}

if err := cc.createPodGroupIfNotExist(newJob); err != nil {
if err := cc.createOrUpdatePodGroup(newJob); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return nil, err
Expand All @@ -175,6 +175,27 @@ func (cc *Controller) initiateJob(job *batch.Job) (*batch.Job, error) {
return newJob, nil
}

func (cc *Controller) initOnJobUpdate(job *batch.Job) error {
klog.V(3).Infof("Starting to initiate Job <%s/%s> on update", job.Namespace, job.Name)
defer klog.V(3).Infof("Finished Job <%s/%s> initiate on update", job.Namespace, job.Name)

klog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)

if err := cc.pluginOnJobUpdate(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PluginError),
fmt.Sprintf("Execute plugin when job add failed, err: %v", err))
return err
}

if err := cc.createOrUpdatePodGroup(job); err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
return err
}

return nil
}

func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
klog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer klog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name)
Expand All @@ -194,6 +215,12 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
if job, err = cc.initiateJob(job); err != nil {
return err
}
} else {
var err error
// TODO: optimize this call it only when scale up/down
if err = cc.initOnJobUpdate(job); err != nil {
return err
}
}

var syncTask bool
Expand Down Expand Up @@ -300,8 +327,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}

// TODO: Can hardly imagine when this is necessary.
// Delete unnecessary pods.
// Delete pods when scale down.
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
for _, pod := range podToDelete {
Expand Down Expand Up @@ -453,9 +479,10 @@ func (cc *Controller) createPVC(job *batch.Job, vcName string, volumeClaim *v1.P
return nil
}

func (cc *Controller) createPodGroupIfNotExist(job *batch.Job) error {
func (cc *Controller) createOrUpdatePodGroup(job *batch.Job) error {
// If PodGroup does not exist, create one for Job.
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down Expand Up @@ -485,6 +512,18 @@ func (cc *Controller) createPodGroupIfNotExist(job *batch.Job) error {
return err
}
}
return nil
}

if pg.Spec.MinMember != job.Spec.MinAvailable {
pg.Spec.MinMember = job.Spec.MinAvailable
if _, err = cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Update(pg); err != nil {
if !apierrors.IsAlreadyExists(err) {
klog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
}

return nil
Expand Down Expand Up @@ -541,6 +580,7 @@ func (cc *Controller) initJobStatus(job *batch.Job) (*batch.Job, error) {
return job, nil
}

job.Status.State.LastTransitionTime = metav1.Now()
job.Status.State.Phase = batch.Pending
job.Status.State.LastTransitionTime = metav1.Now()
job.Status.MinAvailable = job.Spec.MinAvailable
Expand All @@ -559,6 +599,22 @@ func (cc *Controller) initJobStatus(job *batch.Job) (*batch.Job, error) {
return newJob, nil
}

func (cc *Controller) updateJobStatus(job *batch.Job) (*batch.Job, error) {
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return nil, err
}
if err := cc.cache.Update(newJob); err != nil {
klog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, err)
return nil, err
}

return newJob, nil
}

func classifyAndAddUpPodBaseOnPhase(pod *v1.Pod, pending, running, succeeded, failed, unknown *int32) {
switch pod.Status.Phase {
case v1.PodPending:
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestCreatePodGroupIfNotExistFunc(t *testing.T) {
t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()

err := fakeController.createPodGroupIfNotExist(testcase.Job)
err := fakeController.createOrUpdatePodGroup(testcase.Job)
if err != testcase.ExpextVal {
t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpextVal, err)
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ package job

import (
"fmt"
"k8s.io/api/core/v1"
"reflect"
"strconv"

v1 "k8s.io/api/core/v1"
"k8s.io/api/scheduling/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"reflect"
"strconv"

batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
bus "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
klog.Infof("Job update event is ignored since no update in 'Spec'.")
klog.V(6).Infof("Job update event is ignored since no update in 'Spec'.")
return
}

Expand All @@ -97,10 +98,8 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
req := apis.Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,

Event: bus.OutOfSyncEvent,
Event: bus.OutOfSyncEvent,
}

key := jobhelpers.GetJobKeyByReq(&req)
queue := cc.getWorkerQueue(key)
queue.Add(req)
Expand Down
27 changes: 25 additions & 2 deletions pkg/controllers/job/job_controller_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package job
import (
"fmt"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"

batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/job/plugins"
"volcano.sh/volcano/pkg/controllers/job/plugins/interface"
pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface"
)

func (cc *Controller) pluginOnPodCreate(job *batch.Job, pod *v1.Pod) error {
Expand Down Expand Up @@ -88,3 +88,26 @@ func (cc *Controller) pluginOnJobDelete(job *batch.Job) error {

return nil
}

func (cc *Controller) pluginOnJobUpdate(job *batch.Job) error {
client := pluginsinterface.PluginClientset{KubeClients: cc.kubeClient}
if job.Status.ControlledResources == nil {
job.Status.ControlledResources = make(map[string]string)
}
for name, args := range job.Spec.Plugins {
pb, found := plugins.GetPluginBuilder(name)
if !found {
err := fmt.Errorf("failed to get plugin %s", name)
klog.Error(err)
return err
}
klog.Infof("Starting to execute plugin at <pluginOnJobUpdate>: %s on job: <%s/%s>", name, job.Namespace, job.Name)
if err := pb(client, args).OnJobUpdate(job); err != nil {
klog.Errorf("Failed to process on job update plugin %s, err %v.", name, err)
return err
}

}

return nil
}
8 changes: 6 additions & 2 deletions pkg/controllers/job/plugins/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package env

import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"

batch "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers"
"volcano.sh/volcano/pkg/controllers/job/plugins/interface"
pluginsinterface "volcano.sh/volcano/pkg/controllers/job/plugins/interface"
)

type envPlugin struct {
Expand Down Expand Up @@ -73,3 +73,7 @@ func (ep *envPlugin) OnJobAdd(job *batch.Job) error {
func (ep *envPlugin) OnJobDelete(job *batch.Job) error {
return nil
}

func (ep *envPlugin) OnJobUpdate(job *batch.Job) error {
return nil
}
5 changes: 4 additions & 1 deletion pkg/controllers/job/plugins/interface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package pluginsinterface

import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
Expand All @@ -41,4 +41,7 @@ type PluginInterface interface {

// do once when killJob
OnJobDelete(job *vcbatch.Job) error

// do once when updateJob
OnJobUpdate(job *vcbatch.Job) error
}
5 changes: 4 additions & 1 deletion pkg/controllers/job/plugins/ssh/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"fmt"

"golang.org/x/crypto/ssh"

v1 "k8s.io/api/core/v1"
"k8s.io/klog"

Expand Down Expand Up @@ -98,6 +97,10 @@ func (sp *sshPlugin) OnJobDelete(job *batch.Job) error {
return helpers.DeleteSecret(job, sp.client.KubeClients, sp.secretName(job))
}

func (sp *sshPlugin) OnJobUpdate(_ *batch.Job) error {
return nil
}

func (sp *sshPlugin) mountRsaKey(pod *v1.Pod, job *batch.Job) {
secretName := sp.secretName(job)

Expand Down
Loading

0 comments on commit 2a495c5

Please sign in to comment.