Skip to content

Commit

Permalink
Merge pull request #878 from dinhxuanvu/update-crd-one
Browse files Browse the repository at this point in the history
Enable OLM to update CRD when there is only one owner of that CRD
  • Loading branch information
openshift-merge-robot authored Jun 20, 2019
2 parents 0b23b36 + 930163c commit b234fe7
Show file tree
Hide file tree
Showing 3 changed files with 470 additions and 26 deletions.
94 changes: 68 additions & 26 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -14,6 +15,7 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/subscription"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
Expand All @@ -56,21 +59,22 @@ const (
type Operator struct {
queueinformer.Operator

logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources map[resolver.CatalogKey]resolver.SourceRef
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
logger *logrus.Logger
clock utilclock.Clock
opClient operatorclient.ClientInterface
client versioned.Interface
lister operatorlister.OperatorLister
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.RateLimitingInterface
namespace string
sources map[resolver.CatalogKey]resolver.SourceRef
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
resolver resolver.Resolver
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
}

// NewOperator creates a new Catalog Operator.
Expand Down Expand Up @@ -98,17 +102,18 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
resolver: resolver.NewOperatorsV1alpha1Resolver(lister),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
Operator: queueOperator,
logger: logger,
clock: clock,
opClient: opClient,
client: crClient,
lister: lister,
namespace: operatorNamespace,
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
resolver: resolver.NewOperatorsV1alpha1Resolver(lister),
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvProvidedAPIsIndexer: map[string]cache.Indexer{},
}
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now)

Expand All @@ -122,6 +127,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
op.RegisterInformer(csvInformer.Informer())

csvInformer.Informer().AddIndexers(cache.Indexers{index.ProvidedAPIsIndexFuncKey: index.ProvidedAPIsIndexFunc})
csvIndexer := csvInformer.Informer().GetIndexer()
op.csvProvidedAPIsIndexer[namespace] = csvIndexer

// TODO: Add namespace resolve sync

// Wire InstallPlans
Expand Down Expand Up @@ -244,6 +253,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

}

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsV1beta1Interface(), resyncPeriod).Apiextensions().V1beta1().CustomResourceDefinitions()
op.lister.APIExtensionsV1beta1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
)
if err != nil {
return nil, err
}
op.RegisterQueueInformer(crdQueueInformer)

// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
Expand Down Expand Up @@ -1034,6 +1057,25 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
// Attempt to create the CRD.
_, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Create(&crd)
if k8serrors.IsAlreadyExists(err) {
currentCRD, _ := o.lister.APIExtensionsV1beta1().CustomResourceDefinitionLister().Get(crd.GetName())
// Compare 2 CRDs to see if it needs to be updatetd
if !reflect.DeepEqual(crd, *currentCRD) {
// Verify CRD ownership, only attempt to update if
// CRD has only one owner
// Example: provided=database.coreos.com/v1alpha1/EtcdCluster
matchedCSV, err := index.CRDProviderNames(o.csvProvidedAPIsIndexer, crd)
if err != nil {
return errorwrap.Wrapf(err, "error find matched CSV: %s", step.Resource.Name)
}
if len(matchedCSV) == 1 {
// Attempt to update CRD
crd.SetResourceVersion(currentCRD.GetResourceVersion())
_, err = o.opClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd)
if err != nil {
return errorwrap.Wrapf(err, "error update CRD: %s", step.Resource.Name)
}
}
}
// If it already existed, mark the step as Present.
plan.Status.Plan[i].Status = v1alpha1.StepStatusPresent
continue
Expand Down
67 changes: 67 additions & 0 deletions pkg/lib/index/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package indexer

import (
"fmt"
"strings"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
v1beta1ext "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/client-go/tools/cache"
)

const (
// ProvidedAPIsIndexFuncKey is the recommended key to use for registering the index func with an indexer.
ProvidedAPIsIndexFuncKey string = "providedAPIs"
)

// ProvidedAPIsIndexFunc returns indicies from the owned CRDs and APIs of the given object (CSV)
func ProvidedAPIsIndexFunc(obj interface{}) ([]string, error) {
indicies := []string{}

csv, ok := obj.(*v1alpha1.ClusterServiceVersion)
if !ok {
return indicies, fmt.Errorf("invalid object of type: %T", obj)
}

for _, crd := range csv.Spec.CustomResourceDefinitions.Owned {
parts := strings.SplitN(crd.Name, ".", 2)
if len(parts) < 2 {
return indicies, fmt.Errorf("couldn't parse plural.group from crd name: %s", crd.Name)
}
indicies = append(indicies, fmt.Sprintf("%s/%s/%s", parts[1], crd.Version, crd.Kind))
}
for _, api := range csv.Spec.APIServiceDefinitions.Owned {
indicies = append(indicies, fmt.Sprintf("%s/%s/%s", api.Group, api.Version, api.Kind))
}

return indicies, nil
}

// CRDProviderNames returns the names of CSVs that own the given CRD
func CRDProviderNames(indexers map[string]cache.Indexer, crd v1beta1ext.CustomResourceDefinition) (map[string]struct{}, error) {
csvSet := map[string]struct{}{}
crdSpec := map[string]struct{}{}
for _, v := range crd.Spec.Versions {
crdSpec[fmt.Sprintf("%s/%s/%s", crd.Spec.Group, v.Name, crd.Spec.Names.Kind)] = struct{}{}
}
if crd.Spec.Version != "" {
crdSpec[fmt.Sprintf("%s/%s/%s", crd.Spec.Group, crd.Spec.Version, crd.Spec.Names.Kind)] = struct{}{}
}
for _, indexer := range indexers {
for key := range crdSpec {
csvs, err := indexer.ByIndex(ProvidedAPIsIndexFuncKey, key)
if err != nil {
return nil, err
}
for _, item := range csvs {
csv, ok := item.(*v1alpha1.ClusterServiceVersion)
if !ok {
continue
}
// Add to set
csvSet[csv.GetName()] = struct{}{}
}
}
}
return csvSet, nil
}
Loading

0 comments on commit b234fe7

Please sign in to comment.