Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: READY and ACTIVE fields of ScaledJob to show status. #1855

Merged
merged 4 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768))
- Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621))
- Fix Azure Blob scaler when using multiple triggers with the same `blobContainerName` and added a optional `metricName` field ([#1816](https://github.com/kedacore/keda/pull/1816))
- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855))
zroubalik marked this conversation as resolved.
Show resolved Hide resolved

### Breaking Changes

Expand Down
61 changes: 39 additions & 22 deletions controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

kedacontrollerutil "github.com/kedacore/keda/v2/controllers/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -80,47 +81,63 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

var errMsg string
if scaledJob.Spec.JobTargetRef != nil {
reqLogger.Info("Detected ScaleType = Job")
conditions := scaledJob.Status.Conditions.DeepCopy()
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
// ensure Status Conditions are initialized
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil {
return ctrl.Result{}, err
}
}

// Check jobTargetRef is specified
if scaledJob.Spec.JobTargetRef == nil {
errMsg := "scaledJob.spec.jobTargetRef is not set"
err := fmt.Errorf(errMsg)
reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found")
return ctrl.Result{}, err
}
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
conditions := scaledJob.Status.Conditions.DeepCopy()
if err != nil {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

errMsg = "scaledJob.Spec.JobTargetRef is not set"
err = fmt.Errorf(errMsg)
reqLogger.Error(err, "scaledJob.Spec.JobTargetRef not found")
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}

// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject
// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
if err != nil {
return msg, err
}

// Check ScaledJob is Ready or not
_, err = r.scaleHandler.GetScalers(scaledJob)
if err != nil {
logger.Error(err, "Error getting scalers")
return "Failed to ensure ScaledJob is correctly created", err
}

// scaledJob was created or modified - let's start a new ScaleLoop
err = r.requestScaleLoop(logger, scaledJob)
if err != nil {
return "Failed to start a new scale loop with scaling logic", err
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
logger.Info("Initializing Scaling logic according to ScaledJob Specification")
return "ScaledJob is defined correctly and is ready to scaling", nil
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger.V(1).Info("No change in activity")
}

condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are active")
return
}
} else {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are not active")
return
}
}
}

err := e.cleanUp(scaledJob)
if err != nil {
logger.Error(err, "Failed to cleanUp jobs")
Expand Down Expand Up @@ -98,10 +113,10 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}

// Set ScaledObject instance as the owner and controller
// Set ScaledJob instance as the owner and controller
err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme)
if err != nil {
logger.Error(err, "Failed to set ScaledObject as the owner of the new Job")
logger.Error(err, "Failed to set ScaledJob as the owner of the new Job")
}

err = e.client.Create(context.TODO(), job)
Expand Down