Skip to content

Commit

Permalink
chore: separate PPND logic out of pipelinecontroller file (#292)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelman <[email protected]>
Co-authored-by: xdevxy <[email protected]>
  • Loading branch information
juliev0 and xdevxy authored Sep 24, 2024
1 parent f4b7035 commit 1593993
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 246 deletions.
246 changes: 0 additions & 246 deletions internal/controller/pipelinerollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,247 +533,6 @@ func (r *PipelineRolloutReconciler) processExistingPipeline(ctx context.Context,
}
return nil
}

// TODO: move PPND logic out to its own separate file
// normal sequence of events when we need to pause:
// - set Pipeline's desiredPhase=Paused
// - wait for the desire to Pause to be reconciled completely
// - if we need to update the Pipeline spec:
// - update it
// - wait for the spec update to be reconciled completely
//
// - as long as there's no other requirement to pause, set desiredPhase=Running
// return boolean for whether we can stop the PPND process
func (r *PipelineRolloutReconciler) processExistingPipelineWithPPND(ctx context.Context, pipelineRollout *apiv1.PipelineRollout,
existingPipelineDef, newPipelineDef *kubernetes.GenericObject, pipelineNeedsToUpdate bool) (bool, error) {

numaLogger := logger.FromContext(ctx)

needsPaused, err := r.shouldBePaused(ctx, pipelineRollout, existingPipelineDef, newPipelineDef)
if err != nil {
return false, err
}
if needsPaused == nil { // not enough information to know
return false, errors.New("not enough information available to know if we need to pause")
}
shouldBePaused := *needsPaused

var newPipelineSpec PipelineSpec
if err := json.Unmarshal(newPipelineDef.Spec.Raw, &newPipelineSpec); err != nil {
return false, fmt.Errorf("failed to convert new Pipeline spec %q into PipelineSpec type, err=%v", string(newPipelineDef.Spec.Raw), err)
}
if err := r.setPipelineLifecycle(ctx, shouldBePaused, existingPipelineDef); err != nil {
return false, err
}
// update the ResourceVersion in the newPipelineDef in case it got updated
newPipelineDef.ResourceVersion = existingPipelineDef.ResourceVersion

// if it's safe to Update and we need to, do it now
if pipelineNeedsToUpdate {
if !shouldBePaused || (shouldBePaused && isPipelinePausedOrUnpausible(ctx, existingPipelineDef)) {
numaLogger.Infof("it's safe to update Pipeline so updating now")
r.recorder.Eventf(pipelineRollout, "Normal", "PipelineUpdate", "it's safe to update Pipeline so updating now")

if shouldBePaused {
err = withDesiredPhase(newPipelineDef, "Paused")
if err != nil {
return false, err
}
}
err = kubernetes.UpdateCR(ctx, r.restConfig, newPipelineDef, "pipelines")
if err != nil {
return false, err
}
pipelineRollout.Status.MarkDeployed(pipelineRollout.Generation)
}
}

// are we done with PPND?
doneWithPPND := !shouldBePaused

// but if the PipelineRollout says to pause and we're Paused, this is also "doneWithPPND"
specBasedPause := (newPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePaused) || newPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePausing))
if specBasedPause && isPipelinePausedOrUnpausible(ctx, existingPipelineDef) {
doneWithPPND = true
}

return doneWithPPND, nil
}

// Does the Pipeline need to be paused?
// This is based on:
//
// any difference in spec between PipelineRollout and Pipeline, with the exception of lifecycle.desiredPhase field
// any pause request coming from isbsvc or Numaflow Controller
// spec says to pause
// pipeline spec change is still being reconciled
//
// return whether to pause, not to pause, or otherwise unknown
func (r *PipelineRolloutReconciler) shouldBePaused(ctx context.Context, pipelineRollout *apiv1.PipelineRollout, existingPipelineDef, newPipelineDef *kubernetes.GenericObject) (*bool, error) {
numaLogger := logger.FromContext(ctx)

var newPipelineSpec PipelineSpec
if err := json.Unmarshal(newPipelineDef.Spec.Raw, &newPipelineSpec); err != nil {
return nil, fmt.Errorf("failed to convert new Pipeline spec %q into PipelineSpec type, err=%v", string(newPipelineDef.Spec.Raw), err)
}
var existingPipelineSpec PipelineSpec
if err := json.Unmarshal(existingPipelineDef.Spec.Raw, &existingPipelineSpec); err != nil {
return nil, fmt.Errorf("failed to convert existing Pipeline spec %q into PipelineSpec type, err=%v", string(existingPipelineDef.Spec.Raw), err)
}

pipelineNeedsToUpdate, err := pipelineSpecNeedsUpdating(ctx, existingPipelineDef, newPipelineDef)
if err != nil {
return nil, err
}

// is the Pipeline currently being reconciled?
pipelineUpdating, err := pipelineIsUpdating(newPipelineDef, existingPipelineDef)
if err != nil {
return nil, err
}

// is the Pipeline currently being reconciled while our desiredPhase==Paused?
// only in this circumstance do we need to make sure to remain Paused until that reconciliation is complete
existingPipelinePauseDesired := existingPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePaused)
pipelineUpdating = pipelineUpdating && existingPipelinePauseDesired

// Is either Numaflow Controller or ISBService trying to update (such that we need to pause)?
externalPauseRequest, pauseRequestsKnown, err := r.checkForPauseRequest(ctx, pipelineRollout, getISBSvcName(newPipelineSpec))
if err != nil {
return nil, err
}

// check to see if the PipelineRollout spec itself says to Pause
specBasedPause := (newPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePaused) || newPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePausing))

unpausible := checkPipelineStatus(ctx, existingPipelineDef, numaflowv1.PipelinePhaseFailed)

shouldBePaused := (pipelineNeedsToUpdate || pipelineUpdating || externalPauseRequest || specBasedPause) && !unpausible
numaLogger.Debugf("shouldBePaused=%t, pipelineNeedsToUpdate=%t, pipelineUpdating=%t, externalPauseRequest=%t, specBasedPause=%t, unpausible=%t",
shouldBePaused, pipelineNeedsToUpdate, pipelineUpdating, externalPauseRequest, specBasedPause, unpausible)

// if we have incomplete pause request information (i.e. numaflowcontrollerrollout or isbservicerollout not yet reconciled), don't return
// that it's okay to pause
if !shouldBePaused && !pauseRequestsKnown {
numaLogger.Debugf("incomplete pause request information")
return nil, nil
}

return &shouldBePaused, nil
}

// do we need to start the PPND process, if we haven't already?
// this is based on if:
//
// there's any difference in spec between PipelineRollout and Pipeline
// any pause request coming from isbsvc or Numaflow Controller
func (r *PipelineRolloutReconciler) needPPND(ctx context.Context, pipelineRollout *apiv1.PipelineRollout, newPipelineDef *kubernetes.GenericObject, pipelineUpdateRequiringPPND bool) (*bool, error) {
numaLogger := logger.FromContext(ctx)

var newPipelineSpec PipelineSpec
if err := json.Unmarshal(newPipelineDef.Spec.Raw, &newPipelineSpec); err != nil {
return nil, fmt.Errorf("failed to convert new Pipeline spec %q into PipelineSpec type, err=%v", string(newPipelineDef.Spec.Raw), err)
}

// Is either Numaflow Controller or ISBService trying to update (such that we need to pause)?
externalPauseRequest, pauseRequestsKnown, err := r.checkForPauseRequest(ctx, pipelineRollout, getISBSvcName(newPipelineSpec))
if err != nil {
return nil, err
}

needPPND := externalPauseRequest || pipelineUpdateRequiringPPND
numaLogger.Debugf("needPPND=%t, externalPauseRequest=%t, pipelineUpdateRequiringPPND=%t", needPPND, externalPauseRequest, pipelineUpdateRequiringPPND)

// if we have incomplete pause request information (i.e. numaflowcontrollerrollout or isbservicerollout not yet reconciled), don't return
// that it's okay not to pause because we don't know for sure
if !needPPND && !pauseRequestsKnown {
numaLogger.Debugf("incomplete pause request information")
return nil, nil
}

return &needPPND, nil
}

// return true if Pipeline (or its children) is still in the process of being reconciled
func pipelineIsUpdating(newPipelineDef *kubernetes.GenericObject, existingPipelineDef *kubernetes.GenericObject) (bool, error) {
existingPipelineStatus, err := kubernetes.ParseStatus(existingPipelineDef)
if err != nil {
return false, err
}
// if Pipeline's ObservedGeneration is old, then Numaflow Controller hasn't even seen the generation change yet
if !pipelineObservedGenerationCurrent(newPipelineDef.Generation, existingPipelineStatus.ObservedGeneration) {
return true, nil
}

// note if Pipeline's children are still being updated
unhealthyOrProgressing, _ := checkChildResources(existingPipelineStatus.Conditions, func(c metav1.Condition) bool {
return c.Status == metav1.ConditionFalse
})

return unhealthyOrProgressing, nil

}

// make sure our Pipeline's Lifecycle is what we need it to be
func (r *PipelineRolloutReconciler) setPipelineLifecycle(ctx context.Context, pause bool, existingPipelineDef *kubernetes.GenericObject) error {
numaLogger := logger.FromContext(ctx)
var existingPipelineSpec PipelineSpec
if err := json.Unmarshal(existingPipelineDef.Spec.Raw, &existingPipelineSpec); err != nil {
return err
}
lifeCycleIsPaused := existingPipelineSpec.Lifecycle.DesiredPhase == string(numaflowv1.PipelinePhasePaused)

if pause && !lifeCycleIsPaused {
numaLogger.Info("pausing pipeline")
r.recorder.Eventf(existingPipelineDef, "Normal", "PipelinePause", "pausing pipeline")
if err := GetPauseModule().pausePipeline(ctx, r.restConfig, existingPipelineDef); err != nil {
return err
}
} else if !pause && lifeCycleIsPaused {
numaLogger.Info("resuming pipeline")
r.recorder.Eventf(existingPipelineDef, "Normal", "PipelineResume", "resuming pipeline")

run, err := GetPauseModule().runPipelineIfSafe(ctx, r.restConfig, existingPipelineDef)
if err != nil {
return err
}
if !run {
numaLogger.Infof("new pause request, can't resume pipeline at this time, will try again later")
r.recorder.Eventf(existingPipelineDef, "Normal", "PipelineResumeFailed", "new pause request, can't resume pipeline at this time, will try again later")
}
}
return nil
}

// check for all pause requests for this Pipeline (i.e. both from Numaflow Controller and ISBService)
// return:
// - whether there's a pause request
// - whether all pause requests are known
// - error if any
func (r *PipelineRolloutReconciler) checkForPauseRequest(ctx context.Context, pipelineRollout *apiv1.PipelineRollout, isbsvcName string) (bool, bool, error) {
numaLogger := logger.FromContext(ctx)

pm := GetPauseModule()

// Is either Numaflow Controller or ISBService trying to update (such that we need to pause)?
controllerPauseRequest, found := pm.getPauseRequest(pm.getNumaflowControllerKey(pipelineRollout.Namespace))
if !found {
numaLogger.Debugf("No pause request found for numaflow controller on namespace %q", pipelineRollout.Namespace)
return false, false, nil

}
controllerRequestsPause := controllerPauseRequest != nil && *controllerPauseRequest

isbsvcPauseRequest, found := pm.getPauseRequest(pm.getISBServiceKey(pipelineRollout.Namespace, isbsvcName))
if !found {
numaLogger.Debugf("No pause request found for isbsvc %q on namespace %q", isbsvcName, pipelineRollout.Namespace)
return false, false, nil
}
isbsvcRequestsPause := (isbsvcPauseRequest != nil && *isbsvcPauseRequest)

return controllerRequestsPause || isbsvcRequestsPause, true, nil
}

func pipelineObservedGenerationCurrent(generation int64, observedGeneration int64) bool {
return generation <= observedGeneration
}
Expand Down Expand Up @@ -899,11 +658,6 @@ func (r *PipelineRolloutReconciler) SetupWithManager(mgr ctrl.Manager) error {
return nil
}

func isPipelinePausedOrUnpausible(ctx context.Context, pipeline *kubernetes.GenericObject) bool {
// contract with Numaflow is that unpausible Pipelines are "Failed" pipelines
return checkPipelineStatus(ctx, pipeline, numaflowv1.PipelinePhasePaused) || checkPipelineStatus(ctx, pipeline, numaflowv1.PipelinePhaseFailed)
}

// pipelineSpecNeedsUpdating() tests for essential equality, with any irrelevant fields eliminated from the comparison
func pipelineSpecNeedsUpdating(ctx context.Context, a *kubernetes.GenericObject, b *kubernetes.GenericObject) (bool, error) {
numaLogger := logger.FromContext(ctx)
Expand Down
Loading

0 comments on commit 1593993

Please sign in to comment.