diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index 84dc363275..1029b0face 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -330,7 +330,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ctx, queueinformer.WithLogger(op.logger), queueinformer.WithQueue(objGCQueue), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncerWithDelete(op.deleteGCObject)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncer()), ) if err != nil { return nil, err @@ -348,7 +348,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ctx, queueinformer.WithLogger(op.logger), queueinformer.WithQueue(objGCQueue), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncerWithDelete(op.deleteGCObject)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncer()), ) if err != nil { return nil, err @@ -572,17 +572,17 @@ func (a *Operator) RegisterCSVWatchNotification(csvNotification csvutility.Watch a.csvNotification = csvNotification } -func (a *Operator) syncGCObject(obj interface{}) error { - _, ok := obj.(metav1.Object) - if !ok { - a.logger.Warn("object sync: casting to metav1.Object failed") - return nil - } +// func (a *Operator) syncGCObject(obj interface{}) error { +// _, ok := obj.(metav1.Object) +// if !ok { +// a.logger.Warn("object sync: casting to metav1.Object failed") +// return nil +// } - return nil -} +// return nil +// } -func (a *Operator) deleteGCObject(obj interface{}) { +func (a *Operator) syncGCObject(obj interface{}) (syncError error) { metaObj, ok := obj.(metav1.Object) if !ok { a.logger.Warn("object sync: casting to metav1.Object failed") @@ -596,6 +596,27 @@ func (a *Operator) deleteGCObject(obj interface{}) { switch metaObj.(type) { case *corev1.ServiceAccount: + owners := ownerutil.GetOwnersByKind(metaObj, v1alpha1.ClusterServiceVersionKind) + if len(owners) > 0 && metaObj.GetNamespace() != metav1.NamespaceAll { + for _, ownerCSV := range owners { + // Since cross-namespace CSVs can't exist we're guaranteed the owner will be in the same namespace + _, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(metaObj.GetNamespace()).Get(ownerCSV.Name) + if err == nil { + logger.Debugf("CSV still present, must wait until it is deleted (owners=%v)", owners) + syncError = fmt.Errorf("cleanup must wait") + return + } else if !k8serrors.IsNotFound(err) { + syncError = err + return + } + + if err != nil { + logger.Warn(err.Error()) + } + } + return + } + if err := a.opClient.DeleteServiceAccount(metaObj.GetNamespace(), metaObj.GetName(), &metav1.DeleteOptions{}); err != nil { logger.WithError(err).Warn("cannot delete service account") //syncError = err @@ -603,6 +624,24 @@ func (a *Operator) deleteGCObject(obj interface{}) { } logger.Debugf("Deleted apiservice %v due to no owning CSV", metaObj.GetName()) case *rbacv1.ClusterRole: + if name, ns, ok := ownerutil.GetOwnerByKindLabel(metaObj, v1alpha1.ClusterServiceVersionKind); ok { + _, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name) + if err == nil { + logger.Debugf("CSV still present, must wait until it is deleted (owners=%v/%v)", ns, name) + syncError = fmt.Errorf("cleanup must wait") + + // JPEELER delete this + csvs, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).List(labels.Everything()) + for _, item := range csvs { + logger.Debugf("Csv = %#v, err=%v", item, err) + } + return + } else if !k8serrors.IsNotFound(err) { + syncError = err + return + } + } + if err := a.opClient.DeleteClusterRole(metaObj.GetName(), &metav1.DeleteOptions{}); err != nil { logger.WithError(err).Warn("cannot delete cluster role") //syncError = err @@ -610,6 +649,18 @@ func (a *Operator) deleteGCObject(obj interface{}) { } logger.Debugf("Deleted cluster role %v due to no owning CSV", metaObj.GetName()) case *rbacv1.ClusterRoleBinding: + if name, ns, ok := ownerutil.GetOwnerByKindLabel(metaObj, v1alpha1.ClusterServiceVersionKind); ok { + _, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name) + if err == nil { + logger.Debugf("CSV still present, must wait until it is deleted (owners=%v)", name) + syncError = fmt.Errorf("cleanup must wait") + return + } else if !k8serrors.IsNotFound(err) { + syncError = err + return + } + } + if err := a.opClient.DeleteClusterRoleBinding(metaObj.GetName(), &metav1.DeleteOptions{}); err != nil { logger.WithError(err).Warn("cannot delete cluster role binding") //syncError = err @@ -731,6 +782,8 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { "phase": clusterServiceVersion.Status.Phase, }) + logger.Debugf("JPEELER deleting CSV %#v", clusterServiceVersion) + defer func(csv v1alpha1.ClusterServiceVersion) { if clusterServiceVersion.IsCopied() { logger.Debug("deleted csv is copied. skipping operatorgroup requeue") @@ -823,7 +876,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list cluster role bindings") } for _, crb := range crbs { - syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceDeleted, crb)) + syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, crb)) logger.Debugf("handleCSVdeletion- requeued delete event for %v, res=%v", crb, syncError) } @@ -832,7 +885,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list cluster roles") } for _, cr := range crs { - syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceDeleted, cr)) + syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, cr)) logger.Debugf("handleCSVdeletion- requeued delete event for %v, res=%v", cr, syncError) } @@ -841,7 +894,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) { logger.WithError(err).Warn("cannot list service accounts") } for _, sa := range serviceAccounts { - syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceDeleted, sa)) + syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, sa)) logger.Debugf("handleCSVdeletion- requeued delete event for %v, res=%v", sa, syncError) } @@ -1004,6 +1057,7 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error) a.csvCopyQueueSet.Requeue(outCSV.GetNamespace(), outCSV.GetName()) } + logger.Debug("done syncing CSV") return } @@ -1788,8 +1842,13 @@ func (a *Operator) requeueOwnerCSVs(ownee metav1.Object) { owners := ownerutil.GetOwnersByKind(ownee, v1alpha1.ClusterServiceVersionKind) if len(owners) > 0 && ownee.GetNamespace() != metav1.NamespaceAll { for _, ownerCSV := range owners { + _, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ownee.GetNamespace()).Get(ownerCSV.Name) + if k8serrors.IsNotFound(err) { + logger.Debugf("skipping requeue since CSV %v is not in cache", ownerCSV.Name) + continue + } // Since cross-namespace CSVs can't exist we're guaranteed the owner will be in the same namespace - err := a.csvQueueSet.Requeue(ownee.GetNamespace(), ownerCSV.Name) + err = a.csvQueueSet.Requeue(ownee.GetNamespace(), ownerCSV.Name) if err != nil { logger.Warn(err.Error()) } @@ -1799,7 +1858,13 @@ func (a *Operator) requeueOwnerCSVs(ownee metav1.Object) { // Requeue owners based on labels if name, ns, ok := ownerutil.GetOwnerByKindLabel(ownee, v1alpha1.ClusterServiceVersionKind); ok { - err := a.csvQueueSet.Requeue(ns, name) + _, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name) + if k8serrors.IsNotFound(err) { + logger.Debugf("skipping requeue since CSV %v is not in cache", name) + return + } + + err = a.csvQueueSet.Requeue(ns, name) if err != nil { logger.Warn(err.Error()) } diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index 66c271d7ef..185c62e71f 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -247,17 +247,25 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger = logger.WithField("cache-key", key) - // Get the current cached version of the resource - resource, exists, err := loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true + var resource interface{} + if loop.indexer == nil { + resource = event.Resource() + logger.Debugf("JPEELER: detected nil indexer, got resource %v", resource) + } else { + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err = loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") + queue.Forget(item) + return true + } + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true + } } if !ok { @@ -267,6 +275,14 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) } } + key, keyable := loop.key(item) + if !keyable { + logger.WithField("item", item).Warn("tried to form key") + logger.Debugf("JPEELER: about to sync with event: %v", event) + } else { + logger.Debugf("JPEELER: about to sync with event: %v, key: %v", event, key) + } + // Sync and requeue on error (throw out failed deletion syncs) err := loop.Sync(ctx, event) if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted { diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 47b81e55a5..163641a91b 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "sync" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" @@ -39,7 +40,8 @@ func (r *ResourceQueueSet) RequeueEvent(namespace string, resourceEvent kubestat defer r.mutex.RUnlock() if queue, ok := r.queueSet[namespace]; ok { - queue.Add(resourceEvent) + //queue.Add(resourceEvent) + queue.AddAfter(resourceEvent, 15*time.Second) return nil } diff --git a/scenario.sh b/scenario.sh new file mode 100755 index 0000000000..589d402606 --- /dev/null +++ b/scenario.sh @@ -0,0 +1,78 @@ +#!/bin/bash + +namespace=local +csv_name=etcdoperator.v0.9.4-clusterwide +delay=5 + + +function show_resources_simple() { + echo "--- Showing resources of interest" + echo "Listing cluster roles:" + kubectl get clusterrole | grep etcd + echo "Listing cluster roles bindings:" + kubectl get clusterrolebinding | grep etcd + echo "Listing service accounts:" + kubectl get sa --all-namespaces | grep etcd + echo "---" +} + +function show_resources() { + echo "--- Showing detailed resources of interest" + echo "Listing cluster roles:" + for item in $(kubectl get clusterrole -o=name | grep etcd); do + kubectl get "$item" -o jsonpath='{.metadata}' + echo "" + done + + echo "Listing cluster roles bindings:" + for item in $(kubectl get clusterrolebinding -o=name | grep etcd); do + kubectl get "$item" -o jsonpath='{.metadata}' + echo "" + done + + echo "Listing service accounts:" + for item in $(kubectl get sa --all-namespaces -o=name | grep etcd); do + kubectl get "$item" -o jsonpath='{.metadata}' + echo "" + done + + echo "---" + #k get clusterrolebinding -o jsonpath='{range .items[*]}{.metadata}{"\n"}{end}' +} + +cat </dev/null || echo "Waiting for CSV to appear") + if [[ $new_csv_phase != "$csv_phase" ]]; then + csv_phase=$new_csv_phase + echo "CSV phase: $csv_phase" + fi + sleep 1 + retries=$((retries - 1)) +done + +echo "Sleeping for $delay seconds to let OLM settle" +sleep $delay + +show_resources + +echo "Deleting subscription and CSV" +kubectl delete subscription -n $namespace etcd +kubectl delete csv -n $namespace $csv_name +echo "$csv_name deleted" +sleep $((delay*3)) + +show_resources