Skip to content

Commit

Permalink
feat: Run Promise configure workflows at least every 10 hours
Browse files Browse the repository at this point in the history
This is the first of a handful of features which will combine to
provide regular reconciliation across the full Kratix platform.

Right now the reconcilation time is set to 10 hours but will be
configurable in the future.

There are two notes here to be aware of:
1. If the kratix controller restarts, there may be *more*
   reconciliation loops scheduled. That is why this is saying
   "at least" every 10 hours rather than "at most" or "exactly".
2. The workflows will run, but if the declarative outputs are not
   changed, the write to the statestore will not be triggered.
   Making sure to reconcile the actual contents of the statestore
   with the declarede contents is an upcoming piece of work to
   in this stream.

closes #219

Co-authored-by: Sapphire Mason-Brown <[email protected]>
Co-authored-by: Derik Evangelista <[email protected]>
Co-authored-by: Rich Barton-Cooper <[email protected]>
  • Loading branch information
4 people committed Oct 15, 2024
1 parent 91a53d6 commit bca35ff
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 52 deletions.
10 changes: 10 additions & 0 deletions api/v1alpha1/promise_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type PromiseStatus struct {
Status string `json:"status,omitempty"`
RequiredPromises []RequiredPromiseStatus `json:"requiredPromises,omitempty"`
RequiredBy []RequiredBy `json:"requiredBy,omitempty"`
LastAvailableTime *metav1.Time `json:"lastAvailableTime,omitempty"`
}

type PromiseSummary struct {
Expand Down Expand Up @@ -252,6 +253,15 @@ func (d Dependencies) Marshal() ([]byte, error) {
return io.ReadAll(buf)
}

func (p *Promise) GetCondition(conditionType string) *metav1.Condition {
for i := range p.Status.Conditions {
if p.Status.Conditions[i].Type == conditionType {
return &p.Status.Conditions[i]
}
}
return nil
}

//+kubebuilder:object:root=true

// PromiseList contains a list of Promise
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/crd/bases/platform.kratix.io_promises.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ spec:
type: array
kind:
type: string
lastAvailableTime:
format: date-time
type: string
observedGeneration:
format: int64
type: integer
Expand Down
2 changes: 2 additions & 0 deletions controllers/assets/promise-with-workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ apiVersion: platform.kratix.io/v1alpha1
kind: Promise
metadata:
name: promise-with-workflow
labels:
kratix.io/promise-version: v1.1.0
spec:
api:
apiVersion: apiextensions.k8s.io/v1
Expand Down
83 changes: 67 additions & 16 deletions controllers/promise_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/syntasso/kratix/lib/objectutil"
Expand Down Expand Up @@ -69,6 +70,10 @@ type PromiseReconciler struct {
StartedDynamicControllers map[string]*DynamicResourceRequestController
RestartManager func()
NumberOfJobsToKeep int

ScheduledReconciliation map[string]metav1.Time

mutex sync.Mutex
}

const (
Expand All @@ -77,6 +82,7 @@ const (
dynamicControllerDependantResourcesCleanupFinalizer = v1alpha1.KratixPrefix + "dynamic-controller-dependant-resources-cleanup"
crdCleanupFinalizer = v1alpha1.KratixPrefix + "api-crd-cleanup"
dependenciesCleanupFinalizer = v1alpha1.KratixPrefix + "dependencies-cleanup"
lastUpdatedAtAnnotation = v1alpha1.KratixPrefix + "last-updated-at"

requirementStateInstalled = "Requirement installed"
requirementStateNotInstalled = "Requirement not installed"
Expand All @@ -95,9 +101,9 @@ var (
// fastRequeue can be used whenever we want to quickly requeue, and we don't expect
// an error to occur. Example: we delete a resource, we then requeue
// to check it's been deleted. Here we can use a fastRequeue instead of a defaultRequeue
fastRequeue = ctrl.Result{RequeueAfter: 1 * time.Second}
defaultRequeue = ctrl.Result{RequeueAfter: 5 * time.Second}
slowRequeue = ctrl.Result{RequeueAfter: 15 * time.Second}
fastRequeue = ctrl.Result{RequeueAfter: 5 * time.Second}
defaultRequeue = ctrl.Result{RequeueAfter: 15 * time.Second}
slowRequeue = ctrl.Result{RequeueAfter: 60 * time.Second}
)

//+kubebuilder:rbac:groups=platform.kratix.io,resources=promises,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -152,9 +158,16 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
//Set status to unavailable, at the end of this function we set it to
//available. If at anytime we return early, it persisted as unavailable
promise.Status.Status = v1alpha1.PromiseStatusUnavailable
updated, err := r.ensureRequiredPromiseStatusIsUpToDate(ctx, promise)
if err != nil || updated {
return ctrl.Result{}, err
requirementsChanged := r.hasPromiseRequirementsChanged(ctx, promise)

scheduledReconciliation := promise.Status.LastAvailableTime != nil && time.Since(promise.Status.LastAvailableTime.Time) > DefaultReconciliationInterval
if (requirementsChanged || scheduledReconciliation) && originalStatus == v1alpha1.PromiseStatusAvailable {
err := r.Client.Status().Update(ctx, promise)
if err != nil {
return ctrl.Result{}, err
}
logger.Info("Requeueing: requirements changed or scheduled reconciliation")
return ctrl.Result{}, nil
}

//TODO handle removing finalizer
Expand Down Expand Up @@ -204,13 +217,14 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return addFinalizers(opts, promise, []string{dependenciesCleanupFinalizer})
}

requeue, err = r.reconcileDependenciesAndPromiseWorkflows(opts, promise)
ctrlResult, err := r.reconcileDependenciesAndPromiseWorkflows(opts, promise)
if err != nil {
return ctrl.Result{}, err
}

if requeue != nil {
return *requeue, nil
if ctrlResult != nil {
logger.Info("stopping reconciliation while reconciling dependencies")
return *ctrlResult, nil
}

if promise.ContainsAPI() {
Expand Down Expand Up @@ -240,10 +254,12 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct

if promise.GetGeneration() != promise.Status.ObservedGeneration {
if promise.GetGeneration() != 1 {
logger.Info("reconciling all RRs")
if err := r.reconcileAllRRs(rrGVK); err != nil {
return ctrl.Result{}, err
}
}
logger.Info("updating observed generation", "from", promise.Status.ObservedGeneration, "to", promise.GetGeneration())
promise.Status.ObservedGeneration = promise.GetGeneration()
return ctrl.Result{}, r.Client.Status().Update(ctx, promise)
}
Expand All @@ -252,24 +268,38 @@ func (r *PromiseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

if originalStatus == v1alpha1.PromiseStatusAvailable {
return ctrl.Result{}, nil
return r.nextReconciliation(promise, logger)
}

logger.Info("Promise status being set to Available")
promise.Status.Status = v1alpha1.PromiseStatusAvailable
promise.Status.LastAvailableTime = &metav1.Time{Time: time.Now()}
return ctrl.Result{}, r.Client.Status().Update(ctx, promise)
}

func (r *PromiseReconciler) ensureRequiredPromiseStatusIsUpToDate(ctx context.Context, promise *v1alpha1.Promise) (bool, error) {
func (r *PromiseReconciler) nextReconciliation(promise *v1alpha1.Promise, logger logr.Logger) (ctrl.Result, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

scheduled, found := r.ScheduledReconciliation[promise.GetName()]
if !found || time.Now().After(scheduled.Time) {
next := metav1.NewTime(time.Now().Add(DefaultReconciliationInterval))
r.ScheduledReconciliation[promise.GetName()] = next

logger.Info("Scheduling next reconciliation", "scheduledReconciliationTimestamp", next.Time.String())
return ctrl.Result{RequeueAfter: DefaultReconciliationInterval}, nil
}
logger.Info("Reconciliation already scheduled", "scheduledReconciliationTimestamp", scheduled.Time.String(), "labels", promise.Labels)
return ctrl.Result{}, nil
}

func (r *PromiseReconciler) hasPromiseRequirementsChanged(ctx context.Context, promise *v1alpha1.Promise) bool {
latestCondition, latestRequirements := r.generateStatusAndMarkRequirements(ctx, promise)

requirementsFieldChanged := updateRequirementsStatusOnPromise(promise, promise.Status.RequiredPromises, latestRequirements)
conditionsFieldChanged := updateConditionOnPromise(promise, latestCondition)

if conditionsFieldChanged || requirementsFieldChanged {
return true, r.Client.Status().Update(ctx, promise)
}

return false, nil
return conditionsFieldChanged || requirementsFieldChanged
}

func updateConditionOnPromise(promise *v1alpha1.Promise, latestCondition metav1.Condition) bool {
Expand Down Expand Up @@ -377,6 +407,19 @@ func (r *PromiseReconciler) reconcileDependenciesAndPromiseWorkflows(o opts, pro
}

o.logger.Info("Promise contains workflows.promise.configure, reconciling workflows")
pipelineCompletedCondition := promise.GetCondition(string(resourceutil.PipelineCompletedCondition))
forcePipelineRun := pipelineCompletedCondition != nil && pipelineCompletedCondition.Status == "True" && time.Since(pipelineCompletedCondition.LastTransitionTime.Time) > DefaultReconciliationInterval
if forcePipelineRun {
o.logger.Info("Pipeline completed too long ago... forcing the reconciliation", "lastTransitionTime", pipelineCompletedCondition.LastTransitionTime.Time.String())
if promise.Labels == nil {
promise.Labels = make(map[string]string)
}
promise.Labels[resourceutil.ManualReconciliationLabel] = "true"
if err := r.Client.Update(o.ctx, promise); err != nil {
return &ctrl.Result{}, err
}
}

unstructuredPromise, err := promise.ToUnstructured()
if err != nil {
return nil, err
Expand Down Expand Up @@ -977,6 +1020,14 @@ func (r *PromiseReconciler) applyWorkForStaticDependencies(o opts, promise *v1al
} else {
op = "updated"
existingWork.Spec = work.Spec

ann := existingWork.GetAnnotations()
if ann == nil {
ann = map[string]string{}
}
ann[lastUpdatedAtAnnotation] = time.Now().Local().String()
existingWork.SetAnnotations(ann)

err = r.Client.Update(o.ctx, existingWork)
}

Expand Down
Loading

0 comments on commit bca35ff

Please sign in to comment.