Skip to content

Commit

Permalink
Add IndexFunc using ProvidedAPIs as key to look up CSVs
Browse files Browse the repository at this point in the history
Signed-off-by: Vu Dinh <[email protected]>
  • Loading branch information
dinhxuanvu committed Jun 4, 2019
1 parent 07dd944 commit b8951c6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 23 deletions.
58 changes: 37 additions & 21 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
Expand All @@ -57,17 +58,18 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) }
// resolving dependencies in a catalog.
type Operator struct {
*queueinformer.Operator
client versioned.Interface
lister operatorlister.OperatorLister
namespace string
sources map[resolver.CatalogKey]resolver.SourceRef
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
resolver resolver.Resolver
subQueue workqueue.RateLimitingInterface
catSrcQueueSet *queueinformer.ResourceQueueSet
namespaceResolveQueue workqueue.RateLimitingInterface
reconciler reconciler.RegistryReconcilerFactory
client versioned.Interface
lister operatorlister.OperatorLister
namespace string
sources map[resolver.CatalogKey]resolver.SourceRef
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
resolver resolver.Resolver
subQueue workqueue.RateLimitingInterface
catSrcQueueSet *queueinformer.ResourceQueueSet
namespaceResolveQueue workqueue.RateLimitingInterface
reconciler reconciler.RegistryReconcilerFactory
csvProvidedAPIsIndexer map[string]cache.Indexer
}

// NewOperator creates a new Catalog Operator.
Expand All @@ -85,6 +87,8 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti

// Create an OperatorLister
lister := operatorlister.NewLister()
// Create csv provided APIs index map
csvProvidedAPIsIndexer := map[string]cache.Indexer{}

// Create an informer for each watched namespace.
ipSharedIndexInformers := []cache.SharedIndexInformer{}
Expand All @@ -94,11 +98,16 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
nsInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
ipSharedIndexInformers = append(ipSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().InstallPlans().Informer())
subSharedIndexInformers = append(subSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().Subscriptions().Informer())
csvSharedIndexInformers = append(csvSharedIndexInformers, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Informer())

csvInformer := nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions()
lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister())
// Add new index func for provided APIS
csvInformer.Informer().AddIndexers(cache.Indexers{index.ProvidedAPIsIndexFuncKey: index.ProvidedAPIsIndexFunc})
csvSharedIndexInformers = append(csvSharedIndexInformers, csvInformer.Informer())
csvProvidedAPIsIndexer[namespace] = csvInformer.Informer().GetIndexer()

// resolver needs subscription and csv listers
lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, nsInformerFactory.Operators().V1alpha1().Subscriptions().Lister())
lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister())
lister.OperatorsV1alpha1().RegisterInstallPlanLister(namespace, nsInformerFactory.Operators().V1alpha1().InstallPlans().Lister())
}

Expand All @@ -110,13 +119,14 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
catSrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
client: crClient,
lister: lister,
namespace: operatorNamespace,
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
resolver: resolver.NewOperatorsV1alpha1Resolver(lister),
Operator: queueOperator,
catSrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
client: crClient,
lister: lister,
namespace: operatorNamespace,
sources: make(map[resolver.CatalogKey]resolver.SourceRef),
resolver: resolver.NewOperatorsV1alpha1Resolver(lister),
csvProvidedAPIsIndexer: csvProvidedAPIsIndexer,
}

// Create an informer for each catalog namespace
Expand Down Expand Up @@ -1062,7 +1072,13 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
if !reflect.DeepEqual(crd, *currentCRD) {
// Verify CRD ownership, only attempt to update if
// CRD has only one owner
if len(existingCRDOwners[currentCRD.GetName()]) == 1 {
// Example: provided=database.coreos.com/v1alpha1/EtcdCluster
apiKey := fmt.Sprintf("provided=%s/%s", crd.APIVersion, crd.Kind)
matchedCSV, err := index.APIsIndexValues(o.csvProvidedAPIsIndexer, apiKey)
if err != nil {
return errorwrap.Wrapf(err, "error find matched CSV: %s", step.Resource.Name)
}
if len(matchedCSV) == 1 {
// Attempt to update CRD
_, err = o.OpClient.ApiextensionsV1beta1Interface().ApiextensionsV1beta1().CustomResourceDefinitions().Update(&crd)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/certs"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
csvutility "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/csv"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/event"
index "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/index"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/labeler"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
csvutility "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/csv"
)

var (
Expand Down Expand Up @@ -65,7 +65,7 @@ type Operator struct {
gcQueueIndexer *queueinformer.QueueIndexer
apiLabeler labeler.Labeler
csvIndexers map[string]cache.Indexer
csvSetGenerator csvutility.SetGenerator
csvSetGenerator csvutility.SetGenerator
csvReplaceFinder csvutility.ReplaceFinder
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/lib/index/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package indexer

import (
"fmt"
"strings"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
"k8s.io/client-go/tools/cache"
)

const (
// ProvidedAPIsIndexFuncKey is the recommended key to use for registering the index func with an indexer.
ProvidedAPIsIndexFuncKey string = "providedAPIs"
)

func ProvidedAPIsIndexFunc(obj interface{}) ([]string, error) {
indicies := []string{}

csv, ok := obj.(*v1alpha1.ClusterServiceVersion)
if !ok {
return indicies, fmt.Errorf("invalid object of type: %T", obj)
}

for _, crd := range csv.Spec.CustomResourceDefinitions.Owned {
parts := strings.SplitN(crd.Name, ".", 2)
if len(parts) < 2 {
return indicies, fmt.Errorf("couldn't parse plural.group from crd name: %s", crd.Name)
}
indicies = append(indicies, fmt.Sprintf("provided=%s/%s/%s", parts[1], crd.Version, crd.Kind))
}
for _, api := range csv.Spec.APIServiceDefinitions.Owned {
indicies = append(indicies, fmt.Sprintf("provided=%s/%s/%s", api.Group, api.Version, api.Kind))
}

return indicies, nil
}

// APIsIndexValues returns the names of CSVs that own the given CRD
func APIsIndexValues(indexers map[string]cache.Indexer, apiKey string) (map[string]struct{}, error) {
csvSet := map[string]struct{}{}
for _, indexer := range indexers {

csvs, err := indexer.ByIndex(ProvidedAPIsIndexFuncKey, apiKey)
if err != nil {
return nil, err
}
for _, csv := range csvs {
csv, ok := csv.(*v1alpha1.ClusterServiceVersion)
if !ok {
continue
}
// Add to set
csvSet[csv.GetName()] = struct{}{}
}

}
return csvSet, nil
}

0 comments on commit b8951c6

Please sign in to comment.