Skip to content

Commit

Permalink
Merge pull request #33 from georgekuruvillak/predicates_annotation
Browse files Browse the repository at this point in the history
Added predicates to the controller mgr with option reconcile only if annotation present.
  • Loading branch information
Amshuman K R authored Mar 29, 2020
2 parents 5df592a + db1081f commit 534c30c
Show file tree
Hide file tree
Showing 147 changed files with 22,805 additions and 150 deletions.
3 changes: 3 additions & 0 deletions api/v1alpha1/etcd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ type LastOperation struct {

// EtcdStatus defines the observed state of Etcd
type EtcdStatus struct {
// ObservedGeneration is the most recent generation observed for this resource.
// +optional
ObservedGeneration *int64 `json:"observedGeneration,omitempty"`
// +optional
Etcd CrossVersionObjectReference `json:"etcd,omitempty"`
// +optional
Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions charts/etcd/templates/etcd-statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ spec:
matchLabels:
name: etcd
instance: {{ .Values.name }}
{{- if .Values.labels }}
{{ toYaml .Values.labels | indent 6 }}
{{- end }}
template:
metadata:
annotations:
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/druid.gardener.cloud_etcds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,11 @@ spec:
type: object
lastError:
type: string
observedGeneration:
description: ObservedGeneration is the most recent generation observed for
this resource.
format: int64
type: integer
ready:
type: boolean
readyReplicas:
Expand Down
187 changes: 112 additions & 75 deletions controllers/etcd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"

druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1"
"github.com/gardener/etcd-druid/pkg/chartrenderer"
"github.com/gardener/etcd-druid/pkg/common"
druidpredicates "github.com/gardener/etcd-druid/pkg/predicate"
"github.com/gardener/etcd-druid/pkg/utils"
"github.com/gardener/gardener/pkg/utils/kubernetes/health"

kubernetes "github.com/gardener/etcd-druid/pkg/client/kubernetes"
v1beta1constants "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants"

"github.com/gardener/gardener/pkg/utils/imagevector"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -160,10 +165,9 @@ func (r *EtcdReconciler) InitializeControllerWithImageVector() (*EtcdReconciler,
// +kubebuilder:rbac:groups=druid.gardener.cloud,resources=etcds,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=druid.gardener.cloud,resources=etcds/status,verbs=get;update;patch

// Reconcile reconciles the <req>.
// Reconcile reconciles the etcd.
func (r *EtcdReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {

// your logic here
etcd := &druidv1alpha1.Etcd{}
if err := r.Get(context.TODO(), req.NamespacedName, etcd); err != nil {
if errors.IsNotFound(err) {
Expand All @@ -174,42 +178,34 @@ func (r *EtcdReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Error reading the object - requeue the request.
return ctrl.Result{}, err
}
etcdCopy := etcd.DeepCopy()
// Update the found object and write the result back if there are any changes
if !reflect.DeepEqual(etcd.Spec, etcdCopy.Spec) {
etcdCopy.Spec = etcd.Spec
}

logger.Infof("Reconciling etcd: %s/%s", etcd.GetNamespace(), etcd.GetName())
if !etcdCopy.DeletionTimestamp.IsZero() {
if !etcd.DeletionTimestamp.IsZero() {
logger.Infof("Deletion timestamp set for etcd: %s", etcd.GetName())
if err := r.removeFinalizersToDependantSecrets(etcdCopy); err != nil {
if err := r.updateEtcdErrorStatus(etcd, etcdCopy, err); err != nil {
if err := r.removeFinalizersToDependantSecrets(etcd); err != nil {
if err := r.updateEtcdErrorStatus(etcd, err); err != nil {
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}

if sets.NewString(etcd.Finalizers...).Has(FinalizerName) {
logger.Infof("Removing finalizer (%s) from etcd %s", FinalizerName, etcd.GetName())
finalizers := sets.NewString(etcdCopy.Finalizers...)
finalizers := sets.NewString(etcd.Finalizers...)
finalizers.Delete(FinalizerName)
etcdCopy.Finalizers = finalizers.UnsortedList()
if err := r.Patch(context.TODO(), etcdCopy, client.MergeFrom(etcd)); err != nil {
if err := r.updateEtcdErrorStatus(etcd, etcdCopy, err); err != nil {
etcd.Finalizers = finalizers.UnsortedList()
if err := r.Update(context.TODO(), etcd); err != nil {
if err := r.updateEtcdErrorStatus(etcd, err); err != nil {
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
}
Expand All @@ -221,55 +217,46 @@ func (r *EtcdReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
if finalizers := sets.NewString(etcd.Finalizers...); !finalizers.Has(FinalizerName) {
logger.Infof("Adding finalizer (%s) to etcd %s", FinalizerName, etcd.GetName())
finalizers.Insert(FinalizerName)
etcdCopy.Finalizers = finalizers.UnsortedList()
if err := r.Patch(context.TODO(), etcdCopy, client.MergeFrom(etcd)); err != nil {
if err := r.updateEtcdErrorStatus(etcd, etcdCopy, err); err != nil {
etcd.Finalizers = finalizers.UnsortedList()
if err := r.Update(context.TODO(), etcd); err != nil {
if err := r.updateEtcdErrorStatus(etcd, err); err != nil {
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
}
if err := r.addFinalizersToDependantSecrets(etcdCopy); err != nil {
if err := r.updateEtcdErrorStatus(etcd, etcdCopy, err); err != nil {
if err := r.addFinalizersToDependantSecrets(etcd); err != nil {
if err := r.updateEtcdErrorStatus(etcd, err); err != nil {
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
}

svc, ss, err := r.reconcileEtcd(etcdCopy)
svc, ss, err := r.reconcileEtcd(etcd)
if err != nil {
if err := r.updateEtcdErrorStatus(etcd, etcdCopy, err); err != nil {
if err := r.updateEtcdErrorStatus(etcd, err); err != nil {
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}

if err := r.updateEtcdStatus(etcdCopy, etcd, svc, ss); err != nil {
if err := r.updateEtcdStatus(etcd, svc, ss); err != nil {
return ctrl.Result{
Requeue: true,
RequeueAfter: time.Second * 5,
Requeue: true,
}, err
}

logger.Infof("Successfully reconciled etcd: %s", etcd.GetName())

return ctrl.Result{
Requeue: true,
RequeueAfter: time.Minute * 5,
Requeue: false,
}, nil
}

Expand Down Expand Up @@ -572,6 +559,7 @@ func (r *EtcdReconciler) reconcileStatefulSet(cm *corev1.ConfigMap, svc *corev1.
if err := r.Get(context.TODO(), types.NamespacedName{Name: filteredStatefulSets[0].Name, Namespace: filteredStatefulSets[0].Namespace}, ss); err != nil {
return nil, err
}

// Statefulset is claimed by for this etcd. Just sync the specs
if ss, err = r.syncStatefulSetSpec(ss, cm, svc, etcd, values); err != nil {
return nil, err
Expand Down Expand Up @@ -643,10 +631,16 @@ func (r *EtcdReconciler) syncStatefulSetSpec(ss *appsv1.StatefulSet, cm *corev1.
if reflect.DeepEqual(ss.Spec, decoded.Spec) {
return ss, nil
}

ssCopy := ss.DeepCopy()
ssCopy.Spec.Replicas = decoded.Spec.Replicas
ssCopy.Spec.UpdateStrategy = decoded.Spec.UpdateStrategy

recreateSTS := false
if !reflect.DeepEqual(ssCopy.Spec.Selector, decoded.Spec.Selector) {
recreateSTS = true
}

// Applying suggestions from
containers := getContainerMapFromPodTemplateSpec(ssCopy.Spec.Template.Spec)
for i, c := range decoded.Spec.Template.Spec.Containers {
Expand All @@ -659,9 +653,14 @@ func (r *EtcdReconciler) syncStatefulSetSpec(ss *appsv1.StatefulSet, cm *corev1.

ssCopy.Spec.Template = decoded.Spec.Template

err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Patch(context.TODO(), ssCopy, client.MergeFrom(ss))
})
if recreateSTS {
logger.Infof("selector changed, recreating statefulset: %s", ssCopy.Name)
err = r.recreateStatefulset(decoded)
} else {
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Patch(context.TODO(), ssCopy, client.MergeFrom(ss))
})
}

// Ignore the precondition violated error, this machine is already updated
// with the desired label.
Expand All @@ -670,12 +669,25 @@ func (r *EtcdReconciler) syncStatefulSetSpec(ss *appsv1.StatefulSet, cm *corev1.
err = nil
}
if err != nil {
logger.Infof("patching statefulset failed for %s", ss.Name)
return nil, err
}
return ssCopy, err
}

func (r *EtcdReconciler) recreateStatefulset(ss *appsv1.StatefulSet) error {
skipDelete := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if !skipDelete {
if err := r.Delete(context.TODO(), ss); err != nil && !errors.IsNotFound(err) {
return err
}
}
skipDelete = true
return r.Create(context.TODO(), ss)
})
return err
}

func (r *EtcdReconciler) getStatefulSetFromEtcd(etcd *druidv1alpha1.Etcd, cm *corev1.ConfigMap, svc *corev1.Service, values map[string]interface{}) (*appsv1.StatefulSet, error) {
var err error
decoded := &appsv1.StatefulSet{}
Expand Down Expand Up @@ -852,8 +864,11 @@ func (r *EtcdReconciler) getMapFromEtcd(etcd *druidv1alpha1.Etcd) (map[string]in
"storageContainer": etcd.Spec.Backup.Store.Container,
"storePrefix": etcd.Spec.Backup.Store.Prefix,
"storageProvider": utils.StorageProviderFromInfraProvider(etcd.Spec.Backup.Store.Provider),
"storeSecret": etcd.Spec.Backup.Store.SecretRef.Name,
}
if etcd.Spec.Backup.Store.SecretRef != nil {
storeValues["storeSecret"] = etcd.Spec.Backup.Store.SecretRef.Name
}

values["store"] = storeValues
}

Expand Down Expand Up @@ -937,19 +952,19 @@ func (r *EtcdReconciler) removeFinalizersToDependantSecrets(etcd *druidv1alpha1.
return nil
}

func (r *EtcdReconciler) updateEtcdErrorStatus(etcdCopy, etcd *druidv1alpha1.Etcd, lastError error) error {

func (r *EtcdReconciler) updateEtcdErrorStatus(etcd *druidv1alpha1.Etcd, lastError error) error {
lastErrStr := fmt.Sprintf("%v", lastError)
etcdCopy.Status.LastError = &lastErrStr
if err := r.Status().Update(context.TODO(), etcdCopy); err != nil && !errors.IsNotFound(err) {
etcd.Status.LastError = &lastErrStr
etcd.Status.ObservedGeneration = &etcd.Generation
if err := r.Status().Update(context.TODO(), etcd); err != nil && !errors.IsNotFound(err) {
return err
}
return nil
return r.removeOperationAnnotation(etcd)
}

func (r *EtcdReconciler) updateEtcdStatus(etcdCopy, etcd *druidv1alpha1.Etcd, svc *corev1.Service, ss *appsv1.StatefulSet) error {
func (r *EtcdReconciler) updateEtcdStatus(etcd *druidv1alpha1.Etcd, svc *corev1.Service, ss *appsv1.StatefulSet) error {
svcName := svc.Name
etcdCopy.Status.Etcd = druidv1alpha1.CrossVersionObjectReference{
etcd.Status.Etcd = druidv1alpha1.CrossVersionObjectReference{
APIVersion: ss.APIVersion,
Kind: ss.Kind,
Name: ss.Name,
Expand All @@ -958,18 +973,28 @@ func (r *EtcdReconciler) updateEtcdStatus(etcdCopy, etcd *druidv1alpha1.Etcd, sv
for _, condition := range ss.Status.Conditions {
conditions = append(conditions, convertConditionsToEtcd(&condition))
}
etcdCopy.Status.Conditions = conditions
etcd.Status.Conditions = conditions

// To be changed once we have multiple replicas.
etcdCopy.Status.CurrentReplicas = ss.Status.CurrentReplicas
etcdCopy.Status.ReadyReplicas = ss.Status.ReadyReplicas
etcdCopy.Status.UpdatedReplicas = ss.Status.UpdatedReplicas
etcdCopy.Status.Ready = (ss.Status.ReadyReplicas == ss.Status.Replicas)
etcdCopy.Status.ServiceName = &svcName

if err := r.Status().Update(context.TODO(), etcdCopy); err != nil && !errors.IsNotFound(err) {
etcd.Status.CurrentReplicas = ss.Status.CurrentReplicas
etcd.Status.ReadyReplicas = ss.Status.ReadyReplicas
etcd.Status.UpdatedReplicas = ss.Status.UpdatedReplicas
etcd.Status.Ready = (health.CheckStatefulSet(ss) == nil)
etcd.Status.ServiceName = &svcName
etcd.Status.LastError = nil
etcd.Status.ObservedGeneration = &etcd.Generation

if err := r.Status().Update(context.TODO(), etcd); err != nil && !errors.IsNotFound(err) {
return err
}
return r.removeOperationAnnotation(etcd)
}

func (r *EtcdReconciler) removeOperationAnnotation(etcd *druidv1alpha1.Etcd) error {
if _, ok := etcd.Annotations[v1beta1constants.GardenerOperation]; ok {
delete(etcd.Annotations, v1beta1constants.GardenerOperation)
return r.Update(context.TODO(), etcd)
}
return nil
}

Expand All @@ -983,15 +1008,6 @@ func convertConditionsToEtcd(condition *appsv1.StatefulSetCondition) druidv1alph
}
}

// SetupWithManager sets up manager with a new controller and r as the reconcile.Reconciler
func (r *EtcdReconciler) SetupWithManager(mgr ctrl.Manager, workers int) error {
return ctrl.NewControllerManagedBy(mgr).WithOptions(controller.Options{
MaxConcurrentReconciles: workers,
}).For(&druidv1alpha1.Etcd{}).
Owns(&appsv1.StatefulSet{}).
Complete(r)
}

func (r *EtcdReconciler) claimStatefulSets(etcd *druidv1alpha1.Etcd, selector labels.Selector, ss *appsv1.StatefulSetList) ([]*appsv1.StatefulSet, error) {
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Machines (see #42639).
Expand Down Expand Up @@ -1045,3 +1061,24 @@ func (r *EtcdReconciler) claimConfigMaps(etcd *druidv1alpha1.Etcd, selector labe
cm := NewEtcdDruidRefManager(r, etcd, selector, etcdGVK, canAdoptFunc)
return cm.ClaimConfigMaps(ss)
}

// SetupWithManager sets up manager with a new controller and r as the reconcile.Reconciler
func (r *EtcdReconciler) SetupWithManager(mgr ctrl.Manager, workers int, ignoreOperationAnnotation bool) error {
predicates := []predicate.Predicate{
druidpredicates.GenerationChangedPredicate{},
druidpredicates.LastOperationNotSuccessful(),
}
builder := ctrl.NewControllerManagedBy(mgr).WithOptions(controller.Options{
MaxConcurrentReconciles: workers,
})
if !ignoreOperationAnnotation {
predicates = append(predicates, druidpredicates.HasOperationAnnotation())
}
builder = builder.WithEventFilter(druidpredicates.Or(predicates...))
return builder.
For(&druidv1alpha1.Etcd{}).
Owns(&appsv1.StatefulSet{}).
Owns(&v1.Service{}).
Owns(&v1.ConfigMap{}).
Complete(r)
}
Loading

0 comments on commit 534c30c

Please sign in to comment.