Skip to content

Commit

Permalink
Merge pull request #750 from jpeeler/apiservice-sync
Browse files Browse the repository at this point in the history
fix(olm): add deletion monitoring for api services
  • Loading branch information
openshift-merge-robot authored May 21, 2019
2 parents f144c0f + fd97525 commit f370461
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 6 deletions.
53 changes: 49 additions & 4 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
kagg "k8s.io/kube-aggregator/pkg/client/informers/externalversions"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Operator struct {
*queueinformer.Operator
csvQueueSet *queueinformer.ResourceQueueSet
ogQueueSet *queueinformer.ResourceQueueSet
apiSvcQueue workqueue.RateLimitingInterface
client versioned.Interface
resolver install.StrategyResolverInterface
apiReconciler resolver.APIIntersectionReconciler
Expand Down Expand Up @@ -166,17 +168,20 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o

// Register APIService QueueInformer
apiServiceInformer := kagg.NewSharedInformerFactory(opClient.ApiregistrationV1Interface(), wakeupInterval).Apiregistration().V1().APIServices()
op.RegisterQueueInformer(queueinformer.NewInformer(
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "apiservices"),
apiServiceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "apiservices")
apiServiceQueueInformer := queueinformer.NewInformer(
apiServiceQueue,
apiServiceInformer.Informer(),
op.syncObject,
op.syncAPIService,
&cache.ResourceEventHandlerFuncs{
DeleteFunc: op.handleDeletion,
},
"apiservices",
metrics.NewMetricsNil(),
logger,
))
)
op.RegisterQueueInformer(apiServiceQueueInformer)
op.apiSvcQueue = apiServiceQueue
op.lister.APIRegistrationV1().RegisterAPIServiceLister(apiServiceInformer.Lister())

// Register CustomResourceDefinition QueueInformer
Expand Down Expand Up @@ -307,6 +312,46 @@ func NewOperator(logger *logrus.Logger, crClient versioned.Interface, opClient o
return op, nil
}

func (a *Operator) syncAPIService(obj interface{}) (syncError error) {
apiSvc, ok := obj.(*apiregistrationv1.APIService)
if !ok {
a.Log.Debugf("wrong type: %#v", obj)
return fmt.Errorf("casting APIService failed")
}

logger := a.Log.WithFields(logrus.Fields{
"id": queueinformer.NewLoopID(),
"apiSvc": apiSvc.GetName(),
})
logger.Info("syncing APIService")

if name, ns, ok := ownerutil.GetOwnerByKindLabel(apiSvc, v1alpha1.ClusterServiceVersionKind); ok {
_, err := a.lister.CoreV1().NamespaceLister().Get(ns)
if k8serrors.IsNotFound(err) {
logger.Debug("Deleting api service since owning namespace is not found")
syncError = a.OpClient.DeleteAPIService(apiSvc.GetName(), &metav1.DeleteOptions{})
return
}

_, err = a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(ns).Get(name)
if k8serrors.IsNotFound(err) {
logger.Debug("Deleting api service since owning CSV is not found")
syncError = a.OpClient.DeleteAPIService(apiSvc.GetName(), &metav1.DeleteOptions{})
return
} else if err != nil {
syncError = err
return
} else {
if ownerutil.IsOwnedByKindLabel(apiSvc, v1alpha1.ClusterServiceVersionKind) {
logger.Debug("requeueing owner CSVs")
a.requeueOwnerCSVs(apiSvc)
}
}
}

return nil
}

func (a *Operator) syncObject(obj interface{}) (syncError error) {
// Assert as metav1.Object
metaObj, ok := obj.(metav1.Object)
Expand Down
95 changes: 93 additions & 2 deletions test/e2e/csv_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -17,6 +18,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"

v1 "github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
Expand Down Expand Up @@ -1245,9 +1248,31 @@ func TestCreateCSVWithOwnedAPIService(t *testing.T) {
csv.SetName(depName)

// Create the APIService CSV
cleanupCSV, err := createCSV(t, c, crc, csv, testNamespace, false, true)
cleanupCSV, err := createCSV(t, c, crc, csv, testNamespace, false, false)
require.NoError(t, err)
defer cleanupCSV()
defer func() {
watcher, err := c.ApiregistrationV1Interface().ApiregistrationV1().APIServices().Watch(metav1.ListOptions{FieldSelector: "metadata.name=" + apiServiceName})
require.NoError(t, err)

deleted := make(chan struct{})
go func() {
events := watcher.ResultChan()
for {
select {
case evt := <-events:
if evt.Type == watch.Deleted {
deleted <- struct{}{}
return
}
case <-time.After(pollDuration):
require.FailNow(t, "apiservice not cleaned up after CSV deleted")
}
}
}()

cleanupCSV()
<-deleted
}()

fetchedCSV, err := fetchCSV(t, crc, csv.Name, testNamespace, csvSucceededChecker)
require.NoError(t, err)
Expand Down Expand Up @@ -1748,6 +1773,72 @@ func TestCreateSameCSVWithOwnedAPIServiceMultiNamespace(t *testing.T) {
require.NoError(t, err)
}

func TestOrphanedAPIServiceCleanUp(t *testing.T) {
defer cleaner.NotifyTestComplete(t, true)

c := newKubeClient(t)

mockGroup := fmt.Sprintf("hats.%s.redhat.com", genName(""))
version := "v1alpha1"
apiServiceName := strings.Join([]string{version, mockGroup}, ".")

apiService := &apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: apiServiceName,
},
Spec: apiregistrationv1.APIServiceSpec{
Group: mockGroup,
Version: version,
GroupPriorityMinimum: 100,
VersionPriority: 100,
},
}

watcher, err := c.ApiregistrationV1Interface().ApiregistrationV1().APIServices().Watch(metav1.ListOptions{FieldSelector: "metadata.name=" + apiServiceName})
require.NoError(t, err)

deleted := make(chan struct{})
quit := make(chan struct{})
defer close(quit)
go func() {
events := watcher.ResultChan()
for {
select {
case <-quit:
return
case evt := <-events:
if evt.Type == watch.Deleted {
deleted <- struct{}{}
}
case <-time.After(pollDuration):
require.FailNow(t, "orphaned apiservice not cleaned up as expected")
}
}
}()

_, err = c.CreateAPIService(apiService)
require.NoError(t, err, "error creating expected APIService")
orphanedAPISvc, err := c.GetAPIService(apiServiceName)
require.NoError(t, err, "error getting expected APIService")

newLabels := map[string]string{"olm.owner": "hat-serverfd4r5", "olm.owner.kind": "ClusterServiceVersion", "olm.owner.namespace": "nonexistent-namespace"}
orphanedAPISvc.SetLabels(newLabels)
_, err = c.UpdateAPIService(orphanedAPISvc)
require.NoError(t, err, "error updating APIService")
<-deleted

_, err = c.CreateAPIService(apiService)
require.NoError(t, err, "error creating expected APIService")
orphanedAPISvc, err = c.GetAPIService(apiServiceName)
require.NoError(t, err, "error getting expected APIService")

newLabels = map[string]string{"olm.owner": "hat-serverfd4r5", "olm.owner.kind": "ClusterServiceVersion", "olm.owner.namespace": testNamespace}
orphanedAPISvc.SetLabels(newLabels)
_, err = c.UpdateAPIService(orphanedAPISvc)
require.NoError(t, err, "error updating APIService")
<-deleted
}

func TestUpdateCSVSameDeploymentName(t *testing.T) {
defer cleaner.NotifyTestComplete(t, true)

Expand Down

0 comments on commit f370461

Please sign in to comment.