Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Panic in PackageManifest Server #590

Merged
merged 1 commit into from
Dec 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/package-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ var (
func init() {
flags := cmd.Flags()

// flags.BoolVar(&options.InsecureKubeletTLS, "kubelet-insecure-tls", options.InsecureKubeletTLS, "Do not verify CA of serving certificates presented by Kubelets. For testing purposes only.")
flags.DurationVar(&options.WakeupInterval, "interval", options.WakeupInterval, "Interval at which to re-sync CatalogSources")
flags.StringVar(&options.GlobalNamespace, "global-namespace", options.GlobalNamespace, "Name of the namespace where the global CatalogSources are located")
flags.StringSliceVar(&options.WatchedNamespaces, "watched-namespaces", options.WatchedNamespaces, "List of namespaces the package-server will watch watch for CatalogSources")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ require (
k8s.io/code-generator v0.0.0-20180904193909-8c97d6ab64da
k8s.io/gengo v0.0.0-20181106084056-51747d6e00da // indirect
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92 // indirect
k8s.io/kube-aggregator v0.0.0-20181121072050-af204e4cff09
k8s.io/kube-aggregator v0.0.0-20180905000155-efa32eb095fe
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd
k8s.io/kubernetes v1.11.6-beta.0.0.20181126160157-5933b9771b71
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92 h1:PgoMI/L1Nu5Vmvgm+vGheLuxKST8h6
k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/kube-aggregator v0.0.0-20180905000155-efa32eb095fe h1:LM48rywzVEPRg+Os2oUL9/vsztPQGoxmiD3m5VySchw=
k8s.io/kube-aggregator v0.0.0-20180905000155-efa32eb095fe/go.mod h1:8sbzT4QQKDEmSCIbfqjV0sd97GpUT7A4W626sBiYJmU=
k8s.io/kube-aggregator v0.0.0-20181121072050-af204e4cff09 h1:v5wOckd8yeVJcWcnE0xLdW60/Qrd17gXxW24O3aiNxg=
k8s.io/kube-aggregator v0.0.0-20181121072050-af204e4cff09/go.mod h1:8sbzT4QQKDEmSCIbfqjV0sd97GpUT7A4W626sBiYJmU=
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd h1:ggv/Vfza0i5xuhUZyYyxcc25AmQvHY8Zi1C2m8WgBvA=
k8s.io/kube-openapi v0.0.0-20181031203759-72693cb1fadd/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.11.6-beta.0.0.20181126160157-5933b9771b71 h1:ZiDzUVY+KNDO1sbcG0hHZokQsNIhjCCCsy06Z4Ck4JA=
Expand Down
73 changes: 41 additions & 32 deletions pkg/package-server/provider/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,22 @@ func parsePackageManifestsFromConfigMap(cm *corev1.ConfigMap, catsrc *operatorsv
logger.Debug("ConfigMap contains CSVs")
csvListJSON, err := yaml.YAMLToJSON([]byte(csvListYaml))
if err != nil {
logrus.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
logger.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
return nil, fmt.Errorf("error loading CSV list yaml from ConfigMap %s: %s", cmName, err)
}

var parsedCSVList []operatorsv1alpha1.ClusterServiceVersion
err = json.Unmarshal([]byte(csvListJSON), &parsedCSVList)
if err != nil {
logrus.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
logger.Debugf("Load ConfigMap -- ERROR %s : error=%s", cmName, err)
return nil, fmt.Errorf("error parsing CSV list (json) from ConfigMap %s: %s", cmName, err)
}

for _, csv := range parsedCSVList {
found = true

// TODO: add check for invalid CSV definitions
logrus.Debugf("found csv %s", csv.GetName())
logger.Debugf("found csv %s", csv.GetName())
csvs[csv.GetName()] = csv
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func parsePackageManifestsFromConfigMap(cm *corev1.ConfigMap, catsrc *operatorsv
manifest.ObjectMeta.Labels[k] = v
}

logrus.Debugf("retrieved packagemanifest %s", manifest.GetName())
logger.Debugf("retrieved packagemanifest %s", manifest.GetName())
manifests = append(manifests, manifest)
}
}
Expand All @@ -196,6 +196,12 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
return fmt.Errorf("casting catalog source failed")
}

logger := logrus.WithFields(logrus.Fields{
"Action": "Sync CatalogSource",
"name": catsrc.GetName(),
"namespace": catsrc.GetNamespace(),
})

var manifests []packagev1alpha1.PackageManifest

// handle by sourceType
Expand All @@ -217,7 +223,7 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
return fmt.Errorf("catalog source %s in namespace %s source type %s not recognized", catsrc.GetName(), catsrc.GetNamespace(), catsrc.Spec.SourceType)
}

// update manifests
logger.Debug("updating in-memory PackageManifests")
m.mu.Lock()
defer m.mu.Unlock()
for _, manifest := range manifests {
Expand All @@ -228,18 +234,18 @@ func (m *InMemoryProvider) syncCatalogSource(obj interface{}) error {
}

if pm, ok := m.manifests[key]; ok {
// use existing CreationTimestamp
logger.Debugf("package %s already exists", key.packageName)
manifest.CreationTimestamp = pm.ObjectMeta.CreationTimestamp
} else {
// set CreationTimestamp if first time seeing the PackageManifest
logger.Debugf("new package %s found", key.packageName)
manifest.CreationTimestamp = metav1.NewTime(time.Now())
for _, add := range m.add {
if add.namespace == manifest.Status.CatalogSourceNamespace || add.namespace == metav1.NamespaceAll || manifest.Status.CatalogSourceNamespace == m.globalNamespace {
logger.Debugf("sending new package %s to watcher for namespace %s", key.packageName, add.namespace)
add.ch <- manifest
}
}
}

m.manifests[key] = manifest
}

Expand Down Expand Up @@ -276,45 +282,48 @@ func (m *InMemoryProvider) List(namespace string) (*packagev1alpha1.PackageManif
matching = append(matching, pm)
}
}

manifestList.Items = matching
}

return manifestList, nil
}

func (m *InMemoryProvider) Subscribe(namespace string, stopCh <-chan struct{}) (PackageChan, PackageChan, PackageChan, error) {
logger := logrus.WithFields(logrus.Fields{
"Action": "PackageManifest Subscribe",
"namespace": namespace,
})

m.mu.Lock()
defer m.mu.Unlock()

add := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
modify := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
delete := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
addIndex := len(m.add)
modifyIndex := len(m.modify)
deleteIndex := len(m.delete)
m.add = append(m.add, add)
m.modify = append(m.modify, modify)
m.delete = append(m.delete, delete)
addEvent := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
modifyEvent := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
deleteEvent := eventChan{namespace, make(chan packagev1alpha1.PackageManifest)}
m.add = append(m.add, addEvent)
m.modify = append(m.modify, modifyEvent)
m.delete = append(m.delete, deleteEvent)

removeChan := func(target chan packagev1alpha1.PackageManifest, all []eventChan) []eventChan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Couldn't this be defined as an unexported function since you don't need to close over the lexical scope here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, not used anywhere else. Prefer to keep things in the tightest scope possible (makes it nice to collapse in my editor).

for i, event := range all {
if event.ch == target {
logger.Debugf("closing channel")
close(event.ch)
return append(all[:i], all[i+1:]...)
}
}
return all
}

go func() {
<-stopCh
m.mu.Lock()
defer m.mu.Unlock()
for _, add := range m.add {
m.add = append(m.add[:addIndex], m.add[:addIndex+1]...)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the actual bug, we weren't removing channels from the slice correctly.

close(add.ch)
}
for _, modify := range m.modify {
m.modify = append(m.modify[:modifyIndex], m.modify[:modifyIndex+1]...)
close(modify.ch)
}
for _, delete := range m.delete {
m.delete = append(m.delete[:deleteIndex], m.delete[:deleteIndex+1]...)
close(delete.ch)
}

m.add = removeChan(addEvent.ch, m.add)
m.modify = removeChan(modifyEvent.ch, m.modify)
m.delete = removeChan(deleteEvent.ch, m.delete)
return
}()

return add.ch, modify.ch, delete.ch, nil
return addEvent.ch, modifyEvent.ch, deleteEvent.ch, nil
}
21 changes: 21 additions & 0 deletions test/e2e/packagemanifest_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func TestPackageManifestLoading(t *testing.T) {
}

watcher, err := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})
defer watcher.Stop()
require.NoError(t, err)
receivedPackage := make(chan bool)
go func() {
event := <-watcher.ResultChan()
pkg := event.Object.(*packagev1alpha1.PackageManifest)
Expand All @@ -93,6 +95,7 @@ func TestPackageManifestLoading(t *testing.T) {
require.NotNil(t, pkg)
require.Equal(t, packageName, pkg.GetName())
require.Equal(t, expectedStatus, pkg.Status)
receivedPackage <- true
return
}()

Expand All @@ -102,8 +105,26 @@ func TestPackageManifestLoading(t *testing.T) {

pm, err := fetchPackageManifest(t, pmc, testNamespace, packageName, packageManifestHasStatus)

require.True(t, <-receivedPackage)
require.NoError(t, err, "error getting package manifest")
require.NotNil(t, pm)
require.Equal(t, packageName, pm.GetName())
require.Equal(t, expectedStatus, pm.Status)
}

func TestPackageManifestMultipleWatches(t *testing.T) {
pmc := newPMClient(t)

watcherA, _ := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})
watcherB, _ := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})
watcherC, _ := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).Watch(metav1.ListOptions{})

defer watcherB.Stop()
defer watcherC.Stop()
watcherA.Stop()

list, err := pmc.PackagemanifestV1alpha1().PackageManifests(testNamespace).List(metav1.ListOptions{})

require.NoError(t, err)
require.NotEqual(t, 0, len(list.Items))
}