Skip to content

Commit

Permalink
workload: add deletion condition func to decide whether to delete a w…
Browse files Browse the repository at this point in the history
…orkload
  • Loading branch information
liouk committed Dec 3, 2024
1 parent 64d8d9e commit a9ce87e
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 4 deletions.
121 changes: 117 additions & 4 deletions pkg/operator/apiserver/controller/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -68,6 +69,11 @@ type Controller struct {
queue workqueue.RateLimitingInterface
versionRecorder status.VersionGetter
preRunCachesSynced []cache.InformerSynced

// deletionConditionFn checks whether the operand workload of the controller should be deleted;
// it also returns the name of said workload to be used for deletion
deletionConditionFn func() (bool, string, error)
deploymentLister appsv1listers.DeploymentLister
}

// NewController creates a brand new Controller instance.
Expand All @@ -83,8 +89,51 @@ func NewController(instanceName, operatorNamespace, targetNamespace, targetOpera
kubeClient kubernetes.Interface,
podLister corev1listers.PodLister,
informers []factory.Informer,
tagetNamespaceInformers []factory.Informer,
targetNamespaceInformers []factory.Informer,
delegate Delegate,
openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface,
eventRecorder events.Recorder,
versionRecorder status.VersionGetter,
) factory.Controller {
controllerRef := &Controller{
controllerInstanceName: factory.ControllerInstanceName(instanceName, "Workload"),
operatorNamespace: operatorNamespace,
targetNamespace: targetNamespace,
targetOperandVersion: targetOperandVersion,
operandNamePrefix: operandNamePrefix,
conditionsPrefix: conditionsPrefix,
operatorClient: operatorClient,
kubeClient: kubeClient,
podsLister: podLister,
delegate: delegate,
openshiftClusterConfigClient: openshiftClusterConfigClient,
versionRecorder: versionRecorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), instanceName),
}

return newController(controllerRef, informers, targetNamespaceInformers, eventRecorder)
}

// NewControllerWithDeletion creates a brand new Controller instance, which includes a deletion condition.
//
// the "instanceName" param will be used to set conditions in the status field. It will be suffixed with "WorkloadController",
// so it can end up in the condition in the form of "OAuthAPIWorkloadControllerDeploymentAvailable"
//
// the "operatorNamespace" is used to set "version-mapping" in the correct namespace
//
// the "targetNamespace" represent the namespace for the managed resource (DaemonSet)
//
// the "deletionConditionFn" will be used to check whether the workload specified by the
// returned name which is part of targetNamespace must be deleted
func NewControllerWithDeletion(instanceName, operatorNamespace, targetNamespace, targetOperandVersion, operandNamePrefix, conditionsPrefix string,
operatorClient v1helpers.OperatorClient,
kubeClient kubernetes.Interface,
podLister corev1listers.PodLister,
deploymentLister appsv1listers.DeploymentLister,
informers []factory.Informer,
targetNamespaceInformers []factory.Informer,
delegate Delegate,
deletionConditionFn func() (bool, string, error),
openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface,
eventRecorder events.Recorder,
versionRecorder status.VersionGetter,
Expand All @@ -100,14 +149,25 @@ func NewController(instanceName, operatorNamespace, targetNamespace, targetOpera
kubeClient: kubeClient,
podsLister: podLister,
delegate: delegate,
deletionConditionFn: deletionConditionFn,
deploymentLister: deploymentLister,
openshiftClusterConfigClient: openshiftClusterConfigClient,
versionRecorder: versionRecorder,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), instanceName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), instanceName),
}

return newController(controllerRef, informers, targetNamespaceInformers, eventRecorder)
}

func newController(
controllerRef *Controller,
informers []factory.Informer,
targetNamespaceInformers []factory.Informer,
eventRecorder events.Recorder,
) factory.Controller {
c := factory.New()
for _, nsi := range tagetNamespaceInformers {
c.WithNamespaceInformer(nsi, targetNamespace)
for _, nsi := range targetNamespaceInformers {
c.WithNamespaceInformer(nsi, controllerRef.targetNamespace)
}

return c.WithSync(controllerRef.sync).
Expand All @@ -129,6 +189,14 @@ func (c *Controller) sync(ctx context.Context, controllerContext factory.SyncCon
return err
}

if c.deletionConditionFn != nil {
if conditionMet, workload, err := c.deletionConditionFn(); err != nil {
return err
} else if conditionMet {
return c.deleteWorkload(ctx, workload)
}
}

if fulfilled, err := c.delegate.PreconditionFulfilled(ctx); err != nil {
return c.updateOperatorStatus(ctx, operatorStatus, nil, false, false, []error{err})
} else if !fulfilled {
Expand Down Expand Up @@ -356,6 +424,51 @@ func (c *Controller) updateOperatorStatus(ctx context.Context, previousStatus *o
return nil
}

func (c *Controller) deleteWorkload(ctx context.Context, workloadName string) (err error) {
deploymentAvailableCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sDeployment%s", c.conditionsPrefix, operatorv1.OperatorStatusTypeAvailable))

workloadDegradedCondition := applyoperatorv1.OperatorCondition().
WithType(fmt.Sprintf("%sWorkloadDegraded", c.conditionsPrefix))

status := applyoperatorv1.OperatorStatus()
defer func() {
status = status.WithConditions(deploymentAvailableCondition, workloadDegradedCondition)
if applyError := c.operatorClient.ApplyOperatorStatus(ctx, c.controllerInstanceName, status); applyError != nil {
err = applyError
}
}()

if _, err := c.deploymentLister.Deployments(c.targetNamespace).Get(workloadName); err != nil && !apierrors.IsNotFound(err) {
deploymentAvailableCondition = deploymentAvailableCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("DeletionError")

workloadDegradedCondition = workloadDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("DeletionError").
WithMessage(err.Error())
return err

} else if err == nil {
if err := c.kubeClient.AppsV1().Deployments(c.targetNamespace).Delete(ctx, workloadName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
deploymentAvailableCondition = deploymentAvailableCondition.
WithStatus(operatorv1.ConditionFalse).
WithReason("DeletionError")

workloadDegradedCondition = workloadDegradedCondition.
WithStatus(operatorv1.ConditionTrue).
WithReason("DeletionError").
WithMessage(err.Error())
return err
}
}

deploymentAvailableCondition = deploymentAvailableCondition.WithStatus(operatorv1.ConditionTrue)
workloadDegradedCondition = workloadDegradedCondition.WithStatus(operatorv1.ConditionFalse)
return nil
}

// isUpdatingTooLong determines if updating operands takes too long.
// it returns true if the progressing condition has been set to True for at least 15 minutes
func isUpdatingTooLong(operatorStatus *operatorv1.OperatorStatus, progressingConditionType string) (bool, error) {
Expand Down
40 changes: 40 additions & 0 deletions pkg/operator/apiserver/controllerset/apiservercontrollerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,46 @@ func (cs *APIServerControllerSet) WithWorkloadController(
return cs
}

func (cs *APIServerControllerSet) WithWorkloadControllerWithDeletion(
name, operatorNamespace, targetNamespace, targetOperandVersion, operandNamePrefix, conditionsPrefix string,
kubeClient kubernetes.Interface,
delegate workload.Delegate,
deletionConditionFn func() (bool, string, error),
openshiftClusterConfigClient openshiftconfigclientv1.ClusterOperatorInterface,
versionRecorder status.VersionGetter,
kubeInformersForNamespaces v1helpers.KubeInformersForNamespaces,
informers ...factory.Informer) *APIServerControllerSet {

workloadController := workload.NewControllerWithDeletion(
name,
operatorNamespace,
targetNamespace,
targetOperandVersion,
operandNamePrefix,
conditionsPrefix,
cs.operatorClient,
kubeClient,
kubeInformersForNamespaces.PodLister(),
kubeInformersForNamespaces.InformersFor(targetNamespace).Apps().V1().Deployments().Lister(),
append(informers,
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().ConfigMaps().Informer(),
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Secrets().Informer(),
kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Pods().Informer(),
kubeInformersForNamespaces.InformersFor(targetNamespace).Apps().V1().Deployments().Informer(),
kubeInformersForNamespaces.InformersFor(metav1.NamespaceSystem).Core().V1().Nodes().Informer(),
),
[]factory.Informer{kubeInformersForNamespaces.InformersFor(targetNamespace).Core().V1().Namespaces().Informer()},

delegate,
deletionConditionFn,
openshiftClusterConfigClient,
cs.eventRecorder,
versionRecorder)

cs.workloadController.controller = workloadController
return cs
}

func (cs *APIServerControllerSet) WithoutWorkloadController() *APIServerControllerSet {
cs.workloadController.controller = nil
cs.workloadController.emptyAllowed = true
Expand Down

0 comments on commit a9ce87e

Please sign in to comment.