Skip to content

Commit

Permalink
Merge branch 'main' of github.com:kubernetes-sigs/jobset
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Mar 29, 2024
2 parents f651f14 + c18e14c commit 09ef851
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 364 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

JobSet is a Kubernetes-native API for managing a group of [k8s Jobs](https://kubernetes.io/docs/concepts/workloads/controllers/job/) as a unit. It aims to offer a unified API for deploying HPC (e.g., MPI) and AI/ML training workloads (PyTorch, Jax, Tensorflow etc.) on Kubernetes.

Take a look at the [concepts](/docs/concepts/README.md) page for a brief description of how to use JobSet.
Take a look at the [concepts](https://jobset.sigs.k8s.io/docs/concepts/) page for a brief description of how to use JobSet.

## Conceptual Diagram
<img src="site/static/images/jobset_diagram.png" alt="jobset diagram">
Expand Down
5 changes: 5 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ const (
// the same topology domain as the leader pod for that Job).
ExclusivePlacementViolationReason = "ExclusivePlacementViolation"
ExclusivePlacementViolationMessage = "Pod violated JobSet exclusive placement policy"

// Event reason and messages related to startup policy.
InOrderStartupPolicyReason = "StartupPolicyInOrder"
InOrderStartupPolicyExecutingMessage = "in order startup policy is executing"
InOrderStartupPolicyCompletedMessage = "in order startup policy has completed"
)
163 changes: 99 additions & 64 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}
}

// If pod DNS hostnames are enabled, create a headless service for the JobSet
if err := r.createHeadlessSvcIfNecessary(ctx, &js); err != nil {
log.Error(err, "creating headless service")
return ctrl.Result{}, err
}

// If job has not failed or succeeded, continue creating any
// jobs that are ready to be started.
if err := r.createJobs(ctx, &js, ownedJobs, status); err != nil {
Expand All @@ -153,12 +159,12 @@ func (r *JobSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
// Handle suspending a jobset or resuming a suspended jobset.
jobsetSuspended := jobSetSuspended(&js)
if jobsetSuspended {
if err := r.suspendJobSet(ctx, &js, ownedJobs); err != nil {
if err := r.suspendJobs(ctx, &js, ownedJobs.active); err != nil {
log.Error(err, "suspending jobset")
return ctrl.Result{}, err
}
} else {
if err := r.resumeJobSetIfNecessary(ctx, &js, ownedJobs, status); err != nil {
if err := r.resumeJobsIfNecessary(ctx, &js, ownedJobs.active, status); err != nil {
log.Error(err, "resuming jobset")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -304,8 +310,8 @@ func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, j
return rjStatus
}

func (r *JobSetReconciler) suspendJobSet(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs) error {
for _, job := range ownedJobs.active {
func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job) error {
for _, job := range activeJobs {
if !jobSuspended(job) {
job.Spec.Suspend = ptr.To(true)
if err := r.Update(ctx, job); err != nil {
Expand All @@ -326,17 +332,16 @@ func (r *JobSetReconciler) suspendJobSet(ctx context.Context, js *jobset.JobSet,
})
}

func (r *JobSetReconciler) resumeJobSetIfNecessary(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, replicatedJobStatuses []jobset.ReplicatedJobStatus) error {

func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus) error {
// Store node selector for each replicatedJob template.
nodeAffinities := map[string]map[string]string{}
// As an optimization we are only going to store the jobs that need
// to be resumed.
replicatedJobToActiveJobs := map[string][]*batchv1.Job{}
for _, replicatedJob := range js.Spec.ReplicatedJobs {
nodeAffinities[replicatedJob.Name] = replicatedJob.Template.Spec.Template.Spec.NodeSelector
}

for _, job := range ownedJobs.active {
// Map each replicatedJob to a list of its active jobs.
replicatedJobToActiveJobs := map[string][]*batchv1.Job{}
for _, job := range activeJobs {
replicatedJobName := job.Labels[jobset.ReplicatedJobNameKey]
replicatedJobToActiveJobs[replicatedJobName] = append(replicatedJobToActiveJobs[replicatedJobName], job)
}
Expand All @@ -345,8 +350,9 @@ func (r *JobSetReconciler) resumeJobSetIfNecessary(ctx context.Context, js *jobs
// If JobSpec is unsuspended, ensure all active child Jobs are also
// unsuspended and update the suspend condition to true.
for _, replicatedJob := range js.Spec.ReplicatedJobs {
replicatedJobStatus := findReplicatedStatus(replicatedJobStatuses, replicatedJob.Name)
if inOrderStartupPolicy(startupPolicy) && replicatedJobsStarted(replicatedJob.Replicas, replicatedJobStatus) {
replicatedJobStatus := findReplicatedJobStatus(replicatedJobStatuses, replicatedJob.Name)
// If this replicatedJob has already started, continue.
if inOrderStartupPolicy(startupPolicy) && allReplicasStarted(replicatedJob.Replicas, replicatedJobStatus) {
continue
}
jobsFromRJob := replicatedJobToActiveJobs[replicatedJob.Name]
Expand All @@ -355,22 +361,32 @@ func (r *JobSetReconciler) resumeJobSetIfNecessary(ctx context.Context, js *jobs
return err
}
}
// If using in order startup policy, return early and wait for the replicated job to be ready.
if inOrderStartupPolicy(startupPolicy) {
// Add a condition to the JobSet indicating the in order startup policy is executing.
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: true,
condition: generateStartupPolicyCondition(metav1.ConditionFalse),
condition: inOrderStartupPolicyExecutingCondition(),
})
}
}
// At this point all replicated jobs have had their jobs resumed.
// If using an in order startup policy. add a condition to the JobSet indicating the
// in order startup policy has completed.
if inOrderStartupPolicy(startupPolicy) {
return r.ensureCondition(ctx, ensureConditionOpts{
if err := r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
condition: generateStartupPolicyCondition(metav1.ConditionTrue),
})
condition: inOrderStartupPolicyCompletedCondition(),
}); err != nil {
return err
}
}

// Finally, set the suspended condition on the JobSet to false to indicate
// the JobSet is no longer suspended.
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
Expand Down Expand Up @@ -411,12 +427,6 @@ func (r *JobSetReconciler) resumeJob(ctx context.Context, job *batchv1.Job, node
func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ownedJobs *childJobs, replicatedJobStatus []jobset.ReplicatedJobStatus) error {
log := ctrl.LoggerFrom(ctx)

// If pod DNS hostnames are enabled, create a headless service for the JobSet
if dnsHostnamesEnabled(js) {
if err := r.createHeadlessSvcIfNotExist(ctx, js); err != nil {
return err
}
}
startupPolicy := js.Spec.StartupPolicy
var lock sync.Mutex
var finalErrs []error
Expand All @@ -426,13 +436,14 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
return err
}

status := findReplicatedStatus(replicatedJobStatus, replicatedJob.Name)
rjJobStarted := replicatedJobsStarted(replicatedJob.Replicas, status)
status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name)

// For startup policy, if the job is started we can skip this loop.
// Jobs have been created.
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && rjJobStarted {
if !jobSetSuspended(js) && inOrderStartupPolicy(startupPolicy) && allReplicasStarted(replicatedJob.Replicas, status) {
continue
}

workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobs), func(i int) {
job := jobs[i]

Expand All @@ -454,6 +465,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
}
log.V(2).Info("successfully created job", "job", klog.KObj(job))
})

// If we are using inOrder StartupPolicy, then we return to wait for jobs to be ready.
// This updates the StartupPolicy condition and notifies that we are waiting
// for this replicated job to finish.
Expand All @@ -462,7 +474,7 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
jobset: js,
eventType: corev1.EventTypeNormal,
forceFalseUpdate: true,
condition: generateStartupPolicyCondition(metav1.ConditionFalse),
condition: inOrderStartupPolicyExecutingCondition(),
})
}
}
Expand All @@ -478,17 +490,48 @@ func (r *JobSetReconciler) createJobs(ctx context.Context, js *jobset.JobSet, ow
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
eventType: corev1.EventTypeNormal,
condition: generateStartupPolicyCondition(metav1.ConditionTrue),
condition: inOrderStartupPolicyCompletedCondition(),
})
}
return allErrs
}

func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*batchv1.Job) error {
log := ctrl.LoggerFrom(ctx)
lock := &sync.Mutex{}
var finalErrs []error
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobsForDeletion), func(i int) {
targetJob := jobsForDeletion[i]
// Skip deleting jobs with deletion timestamp already set.
if targetJob.DeletionTimestamp != nil {
return
}
// Delete job. This deletion event will trigger another reconciliation,
// where the jobs are recreated.
foregroundPolicy := metav1.DeletePropagationForeground
if err := r.Delete(ctx, targetJob, &client.DeleteOptions{PropagationPolicy: &foregroundPolicy}); client.IgnoreNotFound(err) != nil {
lock.Lock()
defer lock.Unlock()
log.Error(err, fmt.Sprintf("failed to delete job: %q", targetJob.Name))
finalErrs = append(finalErrs, err)
return
}
log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[constants.RestartsKey]])
})
return errors.Join(finalErrs...)
}

// TODO: look into adopting service and updating the selector
// if it is not matching the job selector.
func (r *JobSetReconciler) createHeadlessSvcIfNotExist(ctx context.Context, js *jobset.JobSet) error {
func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js *jobset.JobSet) error {
log := ctrl.LoggerFrom(ctx)

// Headless service is only necessary for indexed jobs whose pods need to communicate with
// eachother via pod hostnames.
if !dnsHostnamesEnabled(js) {
return nil
}

// Check if service already exists. The service name should match the subdomain specified in
// Spec.Network.Subdomain, with default of <jobSetName> set by the webhook.
// If the service doesn't exist in the same namespace, create it.
Expand Down Expand Up @@ -574,29 +617,17 @@ func (r *JobSetReconciler) failurePolicyRecreateAll(ctx context.Context, js *job
return nil
}

func (r *JobSetReconciler) deleteJobs(ctx context.Context, jobsForDeletion []*batchv1.Job) error {
log := ctrl.LoggerFrom(ctx)
lock := &sync.Mutex{}
var finalErrs []error
workqueue.ParallelizeUntil(ctx, constants.MaxParallelism, len(jobsForDeletion), func(i int) {
targetJob := jobsForDeletion[i]
// Skip deleting jobs with deletion timestamp already set.
if targetJob.DeletionTimestamp != nil {
return
}
// Delete job. This deletion event will trigger another reconciliation,
// where the jobs are recreated.
foregroundPolicy := metav1.DeletePropagationForeground
if err := r.Delete(ctx, targetJob, &client.DeleteOptions{PropagationPolicy: &foregroundPolicy}); client.IgnoreNotFound(err) != nil {
lock.Lock()
defer lock.Unlock()
log.Error(err, fmt.Sprintf("failed to delete job: %q", targetJob.Name))
finalErrs = append(finalErrs, err)
return
}
log.V(2).Info("successfully deleted job", "job", klog.KObj(targetJob), "restart attempt", targetJob.Labels[targetJob.Labels[constants.RestartsKey]])
func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet, reason, msg string) error {
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: reason,
Message: msg,
},
eventType: corev1.EventTypeWarning,
})
return errors.Join(finalErrs...)
}

// updateStatus updates the status of a JobSet.
Expand Down Expand Up @@ -630,19 +661,6 @@ func (r *JobSetReconciler) ensureCondition(ctx context.Context, opts ensureCondi
return nil
}

func (r *JobSetReconciler) failJobSet(ctx context.Context, js *jobset.JobSet, reason, msg string) error {
return r.ensureCondition(ctx, ensureConditionOpts{
jobset: js,
condition: metav1.Condition{
Type: string(jobset.JobSetFailed),
Status: metav1.ConditionStatus(corev1.ConditionTrue),
Reason: reason,
Message: msg,
},
eventType: corev1.EventTypeWarning,
})
}

func updateCondition(js *jobset.JobSet, condition metav1.Condition, forceFalseUpdate bool) bool {
condition.LastTransitionTime = metav1.Now()
for i, val := range js.Status.Conditions {
Expand Down Expand Up @@ -749,8 +767,21 @@ func shouldCreateJob(jobName string, ownedJobs *childJobs) bool {
return true
}

// labelAndAnnotateObjects adds standard JobSet related labels and annotations a k8s object.
// In practice it is used to label and annotate child Jobs and pods.
// The same set of labels are also as added as annotations for simplicity's sake.
// The two exceptions to this are:
// 1. "alpha.jobset.sigs.k8s.io/exclusive-topology" which is
// a JobSet annoation optionally added by the user, so we only add it as an annotation
// to child Jobs and pods if it is defined, and do not add it as a label.
// 2. "alpha.jobset.sigs.k8s.io/node-selector" which is another optional
// annotation applied by the user to indicate they are using the
// nodeSelector exclusive placement strategy, where they have manually
// labelled the nodes ahead of time with hack/label_nodes/label_nodes.py
func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.ReplicatedJob, jobIdx int) {
jobName := placement.GenJobName(js.Name, rjob.Name, jobIdx)

// Set labels on the object.
labels := collections.CloneMap(obj.GetLabels())
labels[jobset.JobSetNameKey] = js.Name
labels[jobset.ReplicatedJobNameKey] = rjob.Name
Expand All @@ -759,6 +790,7 @@ func labelAndAnnotateObject(obj metav1.Object, js *jobset.JobSet, rjob *jobset.R
labels[jobset.JobIndexKey] = strconv.Itoa(jobIdx)
labels[jobset.JobKey] = jobHashKey(js.Namespace, jobName)

// Set annotations on the object.
annotations := collections.CloneMap(obj.GetAnnotations())
annotations[jobset.JobSetNameKey] = js.Name
annotations[jobset.ReplicatedJobNameKey] = rjob.Name
Expand Down Expand Up @@ -808,6 +840,9 @@ func GetSubdomain(js *jobset.JobSet) string {
return js.Name
}

// addNamespacedJobNodeSelector adds the namespaced job name as a nodeSelector for use by the
// nodeSelector exclusive job placement strategy, where the user has labeled nodes ahead of time
// with one job name label per nodepool using hack/label_nodes/label_nodes.py
func addNamespacedJobNodeSelector(job *batchv1.Job) {
if job.Spec.Template.Spec.NodeSelector == nil {
job.Spec.Template.Spec.NodeSelector = make(map[string]string)
Expand Down Expand Up @@ -854,7 +889,7 @@ func jobSuspended(job *batchv1.Job) bool {
return ptr.Deref(job.Spec.Suspend, false)
}

func findReplicatedStatus(replicatedJobStatus []jobset.ReplicatedJobStatus, replicatedJobName string) jobset.ReplicatedJobStatus {
func findReplicatedJobStatus(replicatedJobStatus []jobset.ReplicatedJobStatus, replicatedJobName string) jobset.ReplicatedJobStatus {
for _, status := range replicatedJobStatus {
if status.Name == replicatedJobName {
return status
Expand Down
Loading

0 comments on commit 09ef851

Please sign in to comment.