Skip to content

Commit

Permalink
Merge pull request operator-framework#402 from njhale/namespace-aware…
Browse files Browse the repository at this point in the history
…-resolution

feat(resolver): add namespace and channel awareness
  • Loading branch information
njhale authored Aug 8, 2018
2 parents 8c8705c + 1af58d1 commit bc76c3f
Show file tree
Hide file tree
Showing 16 changed files with 1,013 additions and 189 deletions.
332 changes: 262 additions & 70 deletions Gopkg.lock

Large diffs are not rendered by default.

48 changes: 33 additions & 15 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ type Operator struct {
*queueinformer.Operator
client versioned.Interface
namespace string
sources map[registry.SourceKey]registry.Source
sources map[registry.ResourceKey]registry.Source
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
subscriptions map[registry.SubscriptionKey]v1alpha1.Subscription
subscriptionsLock sync.RWMutex
dependencyResolver resolver.DependencyResolver
subQueue workqueue.RateLimitingInterface
}
Expand Down Expand Up @@ -88,8 +86,7 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNa
Operator: queueOperator,
client: crClient,
namespace: operatorNamespace,
sources: make(map[registry.SourceKey]registry.Source),
subscriptions: make(map[registry.SubscriptionKey]v1alpha1.Subscription),
sources: make(map[registry.ResourceKey]registry.Source),
dependencyResolver: &resolver.MultiSourceResolver{},
}

Expand Down Expand Up @@ -147,7 +144,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {

o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
o.sources[registry.SourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}] = src
o.sources[registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}] = src
o.sourcesLastUpdate = timeNow()
return nil
}
Expand Down Expand Up @@ -181,7 +178,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) (syncError error) {

updatedSub.Status.LastUpdated = timeNow()
// Update Subscription with status of transition. Log errors if we can't write them to the status.
if updatedSubFromApi, err := o.client.OperatorsV1alpha1().Subscriptions(updatedSub.GetNamespace()).UpdateStatus(updatedSub); err != nil {
if _, err := o.client.OperatorsV1alpha1().Subscriptions(updatedSub.GetNamespace()).UpdateStatus(updatedSub); err != nil {
logger = logger.WithField("updateError", err.Error())
updateErr := errors.New("error updating Subscription status: " + err.Error())
if syncError == nil {
Expand All @@ -190,12 +187,8 @@ func (o *Operator) syncSubscriptions(obj interface{}) (syncError error) {
}
logger.Info("error transitioning Subscription")
syncError = fmt.Errorf("error transitioning Subscription: %s and error updating Subscription status: %s", syncError, updateErr)
} else {
// map subscription
o.subscriptionsLock.Lock()
defer o.subscriptionsLock.Unlock()
o.subscriptions[registry.SubscriptionKey{Name: sub.GetName(), Namespace: sub.GetNamespace()}] = *updatedSubFromApi
}

return
}

Expand Down Expand Up @@ -332,10 +325,16 @@ func (o *Operator) ResolvePlan(plan *v1alpha1.InstallPlan) error {
}
sourcesSnapshot := o.getSourcesSnapshot(plan, includedNamespaces)

// Copy the subscriptions belonging to the install plans namespace
existingCRDOwners, err := o.getExistingCRDOwners(plan.Namespace)
if err != nil {
return err
}

// Attempt to resolve the InstallPlan
steps, usedSources, notFoundErr := o.dependencyResolver.ResolveInstallPlan(sourcesSnapshot, CatalogLabel, plan)
if notFoundErr != nil {
return notFoundErr
steps, usedSources, err := o.dependencyResolver.ResolveInstallPlan(sourcesSnapshot, existingCRDOwners, CatalogLabel, plan)
if err != nil {
return err
}

// Set the resolved steps
Expand Down Expand Up @@ -515,3 +514,22 @@ func (o *Operator) getSourcesSnapshot(plan *v1alpha1.InstallPlan, includedNamesp

return sourcesSnapshot
}

// getExistingCRDOwners creates a map of CRD names to existing owner CSVs in the given namespace
func (o *Operator) getExistingCRDOwners(namespace string) (map[string][]string, error) {
// Get a list of CSV CRs in the namespace
csvList, err := o.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, err
}

// Map CRD names to existing owner CSV CRs in the namespace
owners := make(map[string][]string)
for _, csv := range csvList.Items {
for _, crd := range csv.Spec.CustomResourceDefinitions.Owned {
owners[crd.Name] = append(owners[crd.Name], csv.GetName())
}
}

return owners, nil
}
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (o *Operator) syncSubscription(in *v1alpha1.Subscription) (*v1alpha1.Subscr
if catalogNamespace == "" {
catalogNamespace = o.namespace
}
catalog, ok := o.sources[registry.SourceKey{Name: out.Spec.CatalogSource, Namespace: catalogNamespace}]
catalog, ok := o.sources[registry.ResourceKey{Name: out.Spec.CatalogSource, Namespace: catalogNamespace}]
if !ok {
return out, fmt.Errorf("unknown catalog source %s in namespace %s", out.Spec.CatalogSource, catalogNamespace)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/catalog/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,8 +947,8 @@ func TestSyncSubscription(t *testing.T) {
op := &Operator{
client: clientFake,
namespace: "ns",
sources: map[registry.SourceKey]registry.Source{
registry.SourceKey{Name: tt.initial.catalogName, Namespace: "ns"}: catalogFake,
sources: map[registry.ResourceKey]registry.Source{
registry.ResourceKey{Name: tt.initial.catalogName, Namespace: "ns"}: catalogFake,
},
sourcesLastUpdate: tt.initial.sourcesLastUpdate,
dependencyResolver: &resolver.MultiSourceResolver{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/configmap_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (d *ConfigMapCatalogResourceLoader) LoadCatalogResourcesFromConfigMap(catal
}
for _, packageManifest := range parsedPackageManifests {
found = true
if err := catalog.addPackageManifest(packageManifest); err != nil {
if err := catalog.AddPackageManifest(packageManifest); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func LoadPackageFromFile(m *InMem, filepath string) (*PackageManifest, error) {

err = json.Unmarshal([]byte(packageJson), &pkg)

if err = m.addPackageManifest(pkg); err != nil {
if err = m.AddPackageManifest(pkg); err != nil {
return nil, fmt.Errorf("unable to set package found in catalog: %v", err)
}
return &pkg, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (m *InMem) FindCSVForPackageNameUnderChannel(packageName string, channelNam
}

// addPackageManifest adds a new package manifest to the in memory catalog.
func (m *InMem) addPackageManifest(pkg PackageManifest) error {
func (m *InMem) AddPackageManifest(pkg PackageManifest) error {
if len(pkg.PackageName) == 0 {
return fmt.Errorf("Empty package name")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/registry/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestFindCSVForPackageNameUnderChannel(t *testing.T) {
catalog.AddOrReplaceService(testCSVResourceAlpha)
catalog.AddOrReplaceService(testCSVResourceStable)

catalog.addPackageManifest(PackageManifest{
catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
Channels: []PackageChannel{
{
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestFindCSVForPackageNameUnderChannel(t *testing.T) {
func TestInvalidPackageManifest(t *testing.T) {
catalog := NewInMem()

err := catalog.addPackageManifest(PackageManifest{
err := catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
Channels: []PackageChannel{
{
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestFindReplacementCSVForPackageNameUnderChannel(t *testing.T) {
catalog.AddOrReplaceService(testCSVResourceStable)
catalog.AddOrReplaceService(testCSVResourceReplaced)

catalog.addPackageManifest(PackageManifest{
catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
DefaultChannelName: "stable",
Channels: []PackageChannel{
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestListLatestCSVsForCRD(t *testing.T) {
catalog.AddOrReplaceService(testCSVResourceStable)
catalog.AddOrReplaceService(testCSVResourceReplaced)

catalog.addPackageManifest(PackageManifest{
catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
DefaultChannelName: "stable",
Channels: []PackageChannel{
Expand Down
56 changes: 39 additions & 17 deletions pkg/controller/registry/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import (
// DependencyResolver defines how a something that resolves dependencies (CSVs, CRDs, etc...)
// should behave
type DependencyResolver interface {
ResolveInstallPlan(sourceRefs []registry.SourceRef, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.SourceKey, error)
ResolveInstallPlan(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.ResourceKey, error)
}

// MultiSourceResolver resolves resolves dependencies from multiple CatalogSources
type MultiSourceResolver struct{}

// ResolveInstallPlan resolves the given InstallPlan with all available sources
func (resolver *MultiSourceResolver) ResolveInstallPlan(sourceRefs []registry.SourceRef, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.SourceKey, error) {
func (resolver *MultiSourceResolver) ResolveInstallPlan(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.ResourceKey, error) {
srm := make(stepResourceMap)
var usedSourceKeys []registry.SourceKey
var usedSourceKeys []registry.ResourceKey

for _, csvName := range plan.Spec.ClusterServiceVersionNames {
csvSRM, used, err := resolver.resolveCSV(sourceRefs, catalogLabelKey, plan.Namespace, csvName)
csvSRM, used, err := resolver.resolveCSV(sourceRefs, existingCRDOwners, catalogLabelKey, plan.Namespace, csvName)
if err != nil {
// Could not resolve CSV in any source
return nil, nil, err
Expand All @@ -38,12 +38,12 @@ func (resolver *MultiSourceResolver) ResolveInstallPlan(sourceRefs []registry.So
return srm.Plan(), usedSourceKeys, nil
}

func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef, catalogLabelKey, planNamespace, csvName string) (stepResourceMap, []registry.SourceKey, error) {
func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey, planNamespace, csvName string) (stepResourceMap, []registry.ResourceKey, error) {
log.Debugf("resolving CSV with name: %s", csvName)

steps := make(stepResourceMap)
csvNamesToBeResolved := []string{csvName}
var usedSourceKeys []registry.SourceKey
var usedSourceKeys []registry.ResourceKey

for len(csvNamesToBeResolved) != 0 {
// Pop off a CSV name.
Expand All @@ -55,7 +55,7 @@ func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef,
continue
}

var csvSourceKey registry.SourceKey
var csvSourceKey registry.ResourceKey
var csv *v1alpha1.ClusterServiceVersion
var err error

Expand All @@ -82,7 +82,7 @@ func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef,
// Resolve each owned or required CRD for the CSV.
for _, crdDesc := range csv.GetAllCRDDescriptions() {
// Attempt to get CRD from same catalog source CSV was found in
step, owner, err := resolver.resolveCRDDescription(sourceRefs, catalogLabelKey, crdDesc, csv.OwnsCRD(crdDesc.Name))
step, owner, err := resolver.resolveCRDDescription(sourceRefs, existingCRDOwners, catalogLabelKey, planNamespace, crdDesc, csv.OwnsCRD(crdDesc.Name))
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -126,15 +126,16 @@ func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef,
return steps, usedSourceKeys, nil
}

func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry.SourceRef, catalogLabelKey string, crdDesc v1alpha1.CRDDescription, owned bool) (v1alpha1.StepResource, string, error) {
logger := log.WithFields(log.Fields{"kind": crdDesc.Kind, "name": crdDesc.Name, "version": crdDesc.Version})
func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey, planNamespace string, crdDesc v1alpha1.CRDDescription, owned bool) (v1alpha1.StepResource, string, error) {
log.Debugf("resolving %#v", crdDesc)

crdKey := registry.CRDKey{
Kind: crdDesc.Kind,
Name: crdDesc.Name,
Version: crdDesc.Version,
}
logger.Debug("resolving")
var crdSourceKey registry.SourceKey

var crdSourceKey registry.ResourceKey
var crd *v1beta1.CustomResourceDefinition
var source registry.Source
var err error
Expand All @@ -155,8 +156,6 @@ func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry
return v1alpha1.StepResource{}, "", err
}

logger.Debugf("found")

if owned {
// Label CRD with catalog source
labels := crd.GetLabels()
Expand Down Expand Up @@ -184,9 +183,32 @@ func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry
return v1alpha1.StepResource{}, "", fmt.Errorf("Unknown CRD %s", crdKey)
}

// TODO: Change to lookup the CSV from the preferred or default channel.
logger.WithField("owner", csvs[0].CSV.Name).Info("found owner")
return v1alpha1.StepResource{}, csvs[0].CSV.Name, nil
var ownerName string
owners := existingCRDOwners[crdKey.Name]
switch len(owners) {
case 0:
// No pre-existing owner found
for _, csv := range csvs {
// Check for the default channel
if csv.IsDefaultChannel {
ownerName = csv.CSV.Name
break
}
}
case 1:
ownerName = owners[0]
default:
return v1alpha1.StepResource{}, "", fmt.Errorf("More than one existing owner for CRD %s in namespace %s found: %v", crdKey.Name, planNamespace, owners)
}

// Check empty name
if ownerName == "" {
log.Infof("No preexisting CSV or default channel found for owners of CRD %v", crdKey)
ownerName = csvs[0].CSV.Name
}

log.Infof("Found %v owner %s", crdKey, ownerName)
return v1alpha1.StepResource{}, ownerName, nil

}

Expand Down
Loading

0 comments on commit bc76c3f

Please sign in to comment.