Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat(resolver): add namespace and channel awareness #402

Merged
merged 4 commits into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 262 additions & 70 deletions Gopkg.lock

Large diffs are not rendered by default.

48 changes: 33 additions & 15 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ type Operator struct {
*queueinformer.Operator
client versioned.Interface
namespace string
sources map[registry.SourceKey]registry.Source
Copy link
Member

@ecordell ecordell Aug 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bigger wrench than I intended to throw when I started looking at this PR...

But it strikes me that this type of tracking (for both "sources" and "subscriptions") is exactly what the cache and indexers are supposed to do for you in client-go.

We already have SharedIndexInformers for all of this available to the operator, so we can probably just use the index provided there.

Take a look at the cache tooling and see if you agree: https://github.com/kubernetes/client-go/tree/master/tools/cache

I would guess that it's as simple as keeping track of the SharedIndexInformers by namespace and then querying those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes a lot of sense when dealing with subscriptions since we don't need to do any additional work to pull it from the indexer's cache/store. Sources however, need an InMem instance constructed from the catalog source object and its config map before they are useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

A future optimization to consider would be to disallow "internal" catalogsources (replacing them with pods that expose an API over the network), which would let us do the same caching for sources and remove the need to parse and keep all the contents in memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Also, it's questionable at this point whether knowing about subscriptions at resolution time matters as much as knowing what's in the target namespace.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in ALM-679, ignore this for this PR

sources map[registry.ResourceKey]registry.Source
sourcesLock sync.RWMutex
sourcesLastUpdate metav1.Time
subscriptions map[registry.SubscriptionKey]v1alpha1.Subscription
subscriptionsLock sync.RWMutex
dependencyResolver resolver.DependencyResolver
subQueue workqueue.RateLimitingInterface
}
Expand Down Expand Up @@ -88,8 +86,7 @@ func NewOperator(kubeconfigPath string, wakeupInterval time.Duration, operatorNa
Operator: queueOperator,
client: crClient,
namespace: operatorNamespace,
sources: make(map[registry.SourceKey]registry.Source),
subscriptions: make(map[registry.SubscriptionKey]v1alpha1.Subscription),
sources: make(map[registry.ResourceKey]registry.Source),
dependencyResolver: &resolver.MultiSourceResolver{},
}

Expand Down Expand Up @@ -147,7 +144,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {

o.sourcesLock.Lock()
defer o.sourcesLock.Unlock()
o.sources[registry.SourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}] = src
o.sources[registry.ResourceKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}] = src
o.sourcesLastUpdate = timeNow()
return nil
}
Expand Down Expand Up @@ -181,7 +178,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) (syncError error) {

updatedSub.Status.LastUpdated = timeNow()
// Update Subscription with status of transition. Log errors if we can't write them to the status.
if updatedSubFromApi, err := o.client.OperatorsV1alpha1().Subscriptions(updatedSub.GetNamespace()).UpdateStatus(updatedSub); err != nil {
if _, err := o.client.OperatorsV1alpha1().Subscriptions(updatedSub.GetNamespace()).UpdateStatus(updatedSub); err != nil {
logger = logger.WithField("updateError", err.Error())
updateErr := errors.New("error updating Subscription status: " + err.Error())
if syncError == nil {
Expand All @@ -190,12 +187,8 @@ func (o *Operator) syncSubscriptions(obj interface{}) (syncError error) {
}
logger.Info("error transitioning Subscription")
syncError = fmt.Errorf("error transitioning Subscription: %s and error updating Subscription status: %s", syncError, updateErr)
} else {
// map subscription
o.subscriptionsLock.Lock()
defer o.subscriptionsLock.Unlock()
o.subscriptions[registry.SubscriptionKey{Name: sub.GetName(), Namespace: sub.GetNamespace()}] = *updatedSubFromApi
}

return
}

Expand Down Expand Up @@ -332,10 +325,16 @@ func (o *Operator) ResolvePlan(plan *v1alpha1.InstallPlan) error {
}
sourcesSnapshot := o.getSourcesSnapshot(plan, includedNamespaces)

// Copy the subscriptions belonging to the install plans namespace
existingCRDOwners, err := o.getExistingCRDOwners(plan.Namespace)
if err != nil {
return err
}

// Attempt to resolve the InstallPlan
steps, usedSources, notFoundErr := o.dependencyResolver.ResolveInstallPlan(sourcesSnapshot, CatalogLabel, plan)
if notFoundErr != nil {
return notFoundErr
steps, usedSources, err := o.dependencyResolver.ResolveInstallPlan(sourcesSnapshot, existingCRDOwners, CatalogLabel, plan)
if err != nil {
return err
}

// Set the resolved steps
Expand Down Expand Up @@ -515,3 +514,22 @@ func (o *Operator) getSourcesSnapshot(plan *v1alpha1.InstallPlan, includedNamesp

return sourcesSnapshot
}

// getExistingCRDOwners creates a map of CRD names to existing owner CSVs in the given namespace
func (o *Operator) getExistingCRDOwners(namespace string) (map[string][]string, error) {
// Get a list of CSV CRs in the namespace
csvList, err := o.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).List(metav1.ListOptions{})
if err != nil {
return nil, err
}

// Map CRD names to existing owner CSV CRs in the namespace
owners := make(map[string][]string)
for _, csv := range csvList.Items {
for _, crd := range csv.Spec.CustomResourceDefinitions.Owned {
owners[crd.Name] = append(owners[crd.Name], csv.GetName())
}
}

return owners, nil
}
2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (o *Operator) syncSubscription(in *v1alpha1.Subscription) (*v1alpha1.Subscr
if catalogNamespace == "" {
catalogNamespace = o.namespace
}
catalog, ok := o.sources[registry.SourceKey{Name: out.Spec.CatalogSource, Namespace: catalogNamespace}]
catalog, ok := o.sources[registry.ResourceKey{Name: out.Spec.CatalogSource, Namespace: catalogNamespace}]
if !ok {
return out, fmt.Errorf("unknown catalog source %s in namespace %s", out.Spec.CatalogSource, catalogNamespace)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/operators/catalog/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,8 +947,8 @@ func TestSyncSubscription(t *testing.T) {
op := &Operator{
client: clientFake,
namespace: "ns",
sources: map[registry.SourceKey]registry.Source{
registry.SourceKey{Name: tt.initial.catalogName, Namespace: "ns"}: catalogFake,
sources: map[registry.ResourceKey]registry.Source{
registry.ResourceKey{Name: tt.initial.catalogName, Namespace: "ns"}: catalogFake,
},
sourcesLastUpdate: tt.initial.sourcesLastUpdate,
dependencyResolver: &resolver.MultiSourceResolver{},
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/configmap_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (d *ConfigMapCatalogResourceLoader) LoadCatalogResourcesFromConfigMap(catal
}
for _, packageManifest := range parsedPackageManifests {
found = true
if err := catalog.addPackageManifest(packageManifest); err != nil {
if err := catalog.AddPackageManifest(packageManifest); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func LoadPackageFromFile(m *InMem, filepath string) (*PackageManifest, error) {

err = json.Unmarshal([]byte(packageJson), &pkg)

if err = m.addPackageManifest(pkg); err != nil {
if err = m.AddPackageManifest(pkg); err != nil {
return nil, fmt.Errorf("unable to set package found in catalog: %v", err)
}
return &pkg, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (m *InMem) FindCSVForPackageNameUnderChannel(packageName string, channelNam
}

// addPackageManifest adds a new package manifest to the in memory catalog.
func (m *InMem) addPackageManifest(pkg PackageManifest) error {
func (m *InMem) AddPackageManifest(pkg PackageManifest) error {
if len(pkg.PackageName) == 0 {
return fmt.Errorf("Empty package name")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/registry/mem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestFindCSVForPackageNameUnderChannel(t *testing.T) {
catalog.AddOrReplaceService(testCSVResourceAlpha)
catalog.AddOrReplaceService(testCSVResourceStable)

catalog.addPackageManifest(PackageManifest{
catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
Channels: []PackageChannel{
{
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestFindCSVForPackageNameUnderChannel(t *testing.T) {
func TestInvalidPackageManifest(t *testing.T) {
catalog := NewInMem()

err := catalog.addPackageManifest(PackageManifest{
err := catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
Channels: []PackageChannel{
{
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestFindReplacementCSVForPackageNameUnderChannel(t *testing.T) {
catalog.AddOrReplaceService(testCSVResourceStable)
catalog.AddOrReplaceService(testCSVResourceReplaced)

catalog.addPackageManifest(PackageManifest{
catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
DefaultChannelName: "stable",
Channels: []PackageChannel{
Expand Down Expand Up @@ -374,7 +374,7 @@ func TestListLatestCSVsForCRD(t *testing.T) {
catalog.AddOrReplaceService(testCSVResourceStable)
catalog.AddOrReplaceService(testCSVResourceReplaced)

catalog.addPackageManifest(PackageManifest{
catalog.AddPackageManifest(PackageManifest{
PackageName: "mockservice",
DefaultChannelName: "stable",
Channels: []PackageChannel{
Expand Down
56 changes: 39 additions & 17 deletions pkg/controller/registry/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import (
// DependencyResolver defines how a something that resolves dependencies (CSVs, CRDs, etc...)
// should behave
type DependencyResolver interface {
ResolveInstallPlan(sourceRefs []registry.SourceRef, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.SourceKey, error)
ResolveInstallPlan(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.ResourceKey, error)
}

// MultiSourceResolver resolves resolves dependencies from multiple CatalogSources
type MultiSourceResolver struct{}

// ResolveInstallPlan resolves the given InstallPlan with all available sources
func (resolver *MultiSourceResolver) ResolveInstallPlan(sourceRefs []registry.SourceRef, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.SourceKey, error) {
func (resolver *MultiSourceResolver) ResolveInstallPlan(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey string, plan *v1alpha1.InstallPlan) ([]v1alpha1.Step, []registry.ResourceKey, error) {
srm := make(stepResourceMap)
var usedSourceKeys []registry.SourceKey
var usedSourceKeys []registry.ResourceKey

for _, csvName := range plan.Spec.ClusterServiceVersionNames {
csvSRM, used, err := resolver.resolveCSV(sourceRefs, catalogLabelKey, plan.Namespace, csvName)
csvSRM, used, err := resolver.resolveCSV(sourceRefs, existingCRDOwners, catalogLabelKey, plan.Namespace, csvName)
if err != nil {
// Could not resolve CSV in any source
return nil, nil, err
Expand All @@ -38,12 +38,12 @@ func (resolver *MultiSourceResolver) ResolveInstallPlan(sourceRefs []registry.So
return srm.Plan(), usedSourceKeys, nil
}

func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef, catalogLabelKey, planNamespace, csvName string) (stepResourceMap, []registry.SourceKey, error) {
func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey, planNamespace, csvName string) (stepResourceMap, []registry.ResourceKey, error) {
log.Debugf("resolving CSV with name: %s", csvName)

steps := make(stepResourceMap)
csvNamesToBeResolved := []string{csvName}
var usedSourceKeys []registry.SourceKey
var usedSourceKeys []registry.ResourceKey

for len(csvNamesToBeResolved) != 0 {
// Pop off a CSV name.
Expand All @@ -55,7 +55,7 @@ func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef,
continue
}

var csvSourceKey registry.SourceKey
var csvSourceKey registry.ResourceKey
var csv *v1alpha1.ClusterServiceVersion
var err error

Expand All @@ -82,7 +82,7 @@ func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef,
// Resolve each owned or required CRD for the CSV.
for _, crdDesc := range csv.GetAllCRDDescriptions() {
// Attempt to get CRD from same catalog source CSV was found in
step, owner, err := resolver.resolveCRDDescription(sourceRefs, catalogLabelKey, crdDesc, csv.OwnsCRD(crdDesc.Name))
step, owner, err := resolver.resolveCRDDescription(sourceRefs, existingCRDOwners, catalogLabelKey, planNamespace, crdDesc, csv.OwnsCRD(crdDesc.Name))
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -126,15 +126,16 @@ func (resolver *MultiSourceResolver) resolveCSV(sourceRefs []registry.SourceRef,
return steps, usedSourceKeys, nil
}

func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry.SourceRef, catalogLabelKey string, crdDesc v1alpha1.CRDDescription, owned bool) (v1alpha1.StepResource, string, error) {
logger := log.WithFields(log.Fields{"kind": crdDesc.Kind, "name": crdDesc.Name, "version": crdDesc.Version})
func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry.SourceRef, existingCRDOwners map[string][]string, catalogLabelKey, planNamespace string, crdDesc v1alpha1.CRDDescription, owned bool) (v1alpha1.StepResource, string, error) {
log.Debugf("resolving %#v", crdDesc)

crdKey := registry.CRDKey{
Kind: crdDesc.Kind,
Name: crdDesc.Name,
Version: crdDesc.Version,
}
logger.Debug("resolving")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for cleaning up logs!

var crdSourceKey registry.SourceKey

var crdSourceKey registry.ResourceKey
var crd *v1beta1.CustomResourceDefinition
var source registry.Source
var err error
Expand All @@ -155,8 +156,6 @@ func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry
return v1alpha1.StepResource{}, "", err
}

logger.Debugf("found")

if owned {
// Label CRD with catalog source
labels := crd.GetLabels()
Expand Down Expand Up @@ -184,9 +183,32 @@ func (resolver *MultiSourceResolver) resolveCRDDescription(sourceRefs []registry
return v1alpha1.StepResource{}, "", fmt.Errorf("Unknown CRD %s", crdKey)
}

// TODO: Change to lookup the CSV from the preferred or default channel.
logger.WithField("owner", csvs[0].CSV.Name).Info("found owner")
return v1alpha1.StepResource{}, csvs[0].CSV.Name, nil
var ownerName string
owners := existingCRDOwners[crdKey.Name]
switch len(owners) {
case 0:
// No pre-existing owner found
for _, csv := range csvs {
// Check for the default channel
if csv.IsDefaultChannel {
ownerName = csv.CSV.Name
break
}
}
case 1:
ownerName = owners[0]
default:
return v1alpha1.StepResource{}, "", fmt.Errorf("More than one existing owner for CRD %s in namespace %s found: %v", crdKey.Name, planNamespace, owners)
}

// Check empty name
if ownerName == "" {
log.Infof("No preexisting CSV or default channel found for owners of CRD %v", crdKey)
ownerName = csvs[0].CSV.Name
}

log.Infof("Found %v owner %s", crdKey, ownerName)
return v1alpha1.StepResource{}, ownerName, nil

}

Expand Down
Loading