Skip to content

Commit

Permalink
success!
Browse files Browse the repository at this point in the history
now just need to clean it all up
  • Loading branch information
Jeff Peeler committed Aug 10, 2019
1 parent 8ffdb5b commit fc43c40
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 28 deletions.
97 changes: 81 additions & 16 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(objGCQueue),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncerWithDelete(op.deleteGCObject)),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncer()),
)
if err != nil {
return nil, err
Expand All @@ -348,7 +348,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(objGCQueue),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncerWithDelete(op.deleteGCObject)),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncGCObject).ToSyncer()),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -572,17 +572,17 @@ func (a *Operator) RegisterCSVWatchNotification(csvNotification csvutility.Watch
a.csvNotification = csvNotification
}

func (a *Operator) syncGCObject(obj interface{}) error {
_, ok := obj.(metav1.Object)
if !ok {
a.logger.Warn("object sync: casting to metav1.Object failed")
return nil
}
// func (a *Operator) syncGCObject(obj interface{}) error {
// _, ok := obj.(metav1.Object)
// if !ok {
// a.logger.Warn("object sync: casting to metav1.Object failed")
// return nil
// }

return nil
}
// return nil
// }

func (a *Operator) deleteGCObject(obj interface{}) {
func (a *Operator) syncGCObject(obj interface{}) (syncError error) {
metaObj, ok := obj.(metav1.Object)
if !ok {
a.logger.Warn("object sync: casting to metav1.Object failed")
Expand All @@ -596,20 +596,71 @@ func (a *Operator) deleteGCObject(obj interface{}) {

switch metaObj.(type) {
case *corev1.ServiceAccount:
owners := ownerutil.GetOwnersByKind(metaObj, v1alpha1.ClusterServiceVersionKind)
if len(owners) > 0 && metaObj.GetNamespace() != metav1.NamespaceAll {
for _, ownerCSV := range owners {
// Since cross-namespace CSVs can't exist we're guaranteed the owner will be in the same namespace
_, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(metaObj.GetNamespace()).Get(ownerCSV.Name)
if err == nil {
logger.Debugf("CSV still present, must wait until it is deleted (owners=%v)", owners)
syncError = fmt.Errorf("cleanup must wait")
return
} else if !k8serrors.IsNotFound(err) {
syncError = err
return
}

if err != nil {
logger.Warn(err.Error())
}
}
return
}

if err := a.opClient.DeleteServiceAccount(metaObj.GetNamespace(), metaObj.GetName(), &metav1.DeleteOptions{}); err != nil {
logger.WithError(err).Warn("cannot delete service account")
//syncError = err
break
}
logger.Debugf("Deleted apiservice %v due to no owning CSV", metaObj.GetName())
case *rbacv1.ClusterRole:
if name, ns, ok := ownerutil.GetOwnerByKindLabel(metaObj, v1alpha1.ClusterServiceVersionKind); ok {
_, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name)
if err == nil {
logger.Debugf("CSV still present, must wait until it is deleted (owners=%v/%v)", ns, name)
syncError = fmt.Errorf("cleanup must wait")

// JPEELER delete this
csvs, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).List(labels.Everything())
for _, item := range csvs {
logger.Debugf("Csv = %#v, err=%v", item, err)
}
return
} else if !k8serrors.IsNotFound(err) {
syncError = err
return
}
}

if err := a.opClient.DeleteClusterRole(metaObj.GetName(), &metav1.DeleteOptions{}); err != nil {
logger.WithError(err).Warn("cannot delete cluster role")
//syncError = err
break
}
logger.Debugf("Deleted cluster role %v due to no owning CSV", metaObj.GetName())
case *rbacv1.ClusterRoleBinding:
if name, ns, ok := ownerutil.GetOwnerByKindLabel(metaObj, v1alpha1.ClusterServiceVersionKind); ok {
_, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name)
if err == nil {
logger.Debugf("CSV still present, must wait until it is deleted (owners=%v)", name)
syncError = fmt.Errorf("cleanup must wait")
return
} else if !k8serrors.IsNotFound(err) {
syncError = err
return
}
}

if err := a.opClient.DeleteClusterRoleBinding(metaObj.GetName(), &metav1.DeleteOptions{}); err != nil {
logger.WithError(err).Warn("cannot delete cluster role binding")
//syncError = err
Expand Down Expand Up @@ -731,6 +782,8 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
"phase": clusterServiceVersion.Status.Phase,
})

logger.Debugf("JPEELER deleting CSV %#v", clusterServiceVersion)

defer func(csv v1alpha1.ClusterServiceVersion) {
if clusterServiceVersion.IsCopied() {
logger.Debug("deleted csv is copied. skipping operatorgroup requeue")
Expand Down Expand Up @@ -823,7 +876,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
logger.WithError(err).Warn("cannot list cluster role bindings")
}
for _, crb := range crbs {
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceDeleted, crb))
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, crb))
logger.Debugf("handleCSVdeletion- requeued delete event for %v, res=%v", crb, syncError)
}

Expand All @@ -832,7 +885,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
logger.WithError(err).Warn("cannot list cluster roles")
}
for _, cr := range crs {
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceDeleted, cr))
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, cr))
logger.Debugf("handleCSVdeletion- requeued delete event for %v, res=%v", cr, syncError)
}

Expand All @@ -841,7 +894,7 @@ func (a *Operator) handleClusterServiceVersionDeletion(obj interface{}) {
logger.WithError(err).Warn("cannot list service accounts")
}
for _, sa := range serviceAccounts {
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceDeleted, sa))
syncError := a.objGCQueueSet.RequeueEvent("", kubestate.NewResourceEvent(kubestate.ResourceUpdated, sa))
logger.Debugf("handleCSVdeletion- requeued delete event for %v, res=%v", sa, syncError)
}

Expand Down Expand Up @@ -1004,6 +1057,7 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
a.csvCopyQueueSet.Requeue(outCSV.GetNamespace(), outCSV.GetName())
}

logger.Debug("done syncing CSV")
return
}

Expand Down Expand Up @@ -1788,8 +1842,13 @@ func (a *Operator) requeueOwnerCSVs(ownee metav1.Object) {
owners := ownerutil.GetOwnersByKind(ownee, v1alpha1.ClusterServiceVersionKind)
if len(owners) > 0 && ownee.GetNamespace() != metav1.NamespaceAll {
for _, ownerCSV := range owners {
_, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ownee.GetNamespace()).Get(ownerCSV.Name)
if k8serrors.IsNotFound(err) {
logger.Debugf("skipping requeue since CSV %v is not in cache", ownerCSV.Name)
continue
}
// Since cross-namespace CSVs can't exist we're guaranteed the owner will be in the same namespace
err := a.csvQueueSet.Requeue(ownee.GetNamespace(), ownerCSV.Name)
err = a.csvQueueSet.Requeue(ownee.GetNamespace(), ownerCSV.Name)
if err != nil {
logger.Warn(err.Error())
}
Expand All @@ -1799,7 +1858,13 @@ func (a *Operator) requeueOwnerCSVs(ownee metav1.Object) {

// Requeue owners based on labels
if name, ns, ok := ownerutil.GetOwnerByKindLabel(ownee, v1alpha1.ClusterServiceVersionKind); ok {
err := a.csvQueueSet.Requeue(ns, name)
_, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name)
if k8serrors.IsNotFound(err) {
logger.Debugf("skipping requeue since CSV %v is not in cache", name)
return
}

err = a.csvQueueSet.Requeue(ns, name)
if err != nil {
logger.Warn(err.Error())
}
Expand Down
38 changes: 27 additions & 11 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,25 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)

logger = logger.WithField("cache-key", key)

// Get the current cached version of the resource
resource, exists, err := loop.indexer.GetByKey(key)
if err != nil {
logger.WithError(err).Error("cache get failed")
queue.Forget(item)
return true
}
if !exists {
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
queue.Forget(item)
return true
var resource interface{}
if loop.indexer == nil {
resource = event.Resource()
logger.Debugf("JPEELER: detected nil indexer, got resource %v", resource)
} else {
// Get the current cached version of the resource
var exists bool
var err error
resource, exists, err = loop.indexer.GetByKey(key)
if err != nil {
logger.WithError(err).Error("cache get failed")
queue.Forget(item)
return true
}
if !exists {
logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache")
queue.Forget(item)
return true
}
}

if !ok {
Expand All @@ -267,6 +275,14 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
}
}

key, keyable := loop.key(item)
if !keyable {
logger.WithField("item", item).Warn("tried to form key")
logger.Debugf("JPEELER: about to sync with event: %v", event)
} else {
logger.Debugf("JPEELER: about to sync with event: %v, key: %v", event, key)
}

// Sync and requeue on error (throw out failed deletion syncs)
err := loop.Sync(ctx, event)
if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted {
Expand Down
4 changes: 3 additions & 1 deletion pkg/lib/queueinformer/resourcequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -39,7 +40,8 @@ func (r *ResourceQueueSet) RequeueEvent(namespace string, resourceEvent kubestat
defer r.mutex.RUnlock()

if queue, ok := r.queueSet[namespace]; ok {
queue.Add(resourceEvent)
//queue.Add(resourceEvent)
queue.AddAfter(resourceEvent, 15*time.Second)
return nil
}

Expand Down
78 changes: 78 additions & 0 deletions scenario.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/bin/bash

namespace=local
csv_name=etcdoperator.v0.9.4-clusterwide
delay=5


function show_resources_simple() {
echo "--- Showing resources of interest"
echo "Listing cluster roles:"
kubectl get clusterrole | grep etcd
echo "Listing cluster roles bindings:"
kubectl get clusterrolebinding | grep etcd
echo "Listing service accounts:"
kubectl get sa --all-namespaces | grep etcd
echo "---"
}

function show_resources() {
echo "--- Showing detailed resources of interest"
echo "Listing cluster roles:"
for item in $(kubectl get clusterrole -o=name | grep etcd); do
kubectl get "$item" -o jsonpath='{.metadata}'
echo ""
done

echo "Listing cluster roles bindings:"
for item in $(kubectl get clusterrolebinding -o=name | grep etcd); do
kubectl get "$item" -o jsonpath='{.metadata}'
echo ""
done

echo "Listing service accounts:"
for item in $(kubectl get sa --all-namespaces -o=name | grep etcd); do
kubectl get "$item" -o jsonpath='{.metadata}'
echo ""
done

echo "---"
#k get clusterrolebinding -o jsonpath='{range .items[*]}{.metadata}{"\n"}{end}'
}

cat <<EOF | kubectl apply -f -
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
name: etcd
namespace: $namespace
spec:
channel: clusterwide-alpha
name: etcd
source: operatorhubio-catalog
sourceNamespace: local
EOF

retries=50
until [[ $retries == 0 || $new_csv_phase == "Succeeded" ]]; do
new_csv_phase=$(kubectl get csv -n "${namespace}" $csv_name -o jsonpath='{.status.phase}' 2>/dev/null || echo "Waiting for CSV to appear")
if [[ $new_csv_phase != "$csv_phase" ]]; then
csv_phase=$new_csv_phase
echo "CSV phase: $csv_phase"
fi
sleep 1
retries=$((retries - 1))
done

echo "Sleeping for $delay seconds to let OLM settle"
sleep $delay

show_resources

echo "Deleting subscription and CSV"
kubectl delete subscription -n $namespace etcd
kubectl delete csv -n $namespace $csv_name
echo "$csv_name deleted"
sleep $((delay*3))

show_resources

0 comments on commit fc43c40

Please sign in to comment.