diff --git a/internal/runtime/catalog/catalog.go b/internal/runtime/catalog/catalog.go index f9ecc79d3a84..45df37397b86 100644 --- a/internal/runtime/catalog/catalog.go +++ b/internal/runtime/catalog/catalog.go @@ -334,3 +334,11 @@ func (gvh GroupVersionHook) String() string { var emptyGroupVersionHook = GroupVersionHook{} var emptyGroupVersionKind = schema.GroupVersionKind{} + +// GroupHook represents Group and Hook of a GroupVersionHook. +// This can be used instead of GroupVersionHook when +// Version should not be used. +type GroupHook struct { + Group string + Hook string +} diff --git a/internal/runtime/registry/doc.go b/internal/runtime/registry/doc.go new file mode 100644 index 000000000000..1cc5aa71785e --- /dev/null +++ b/internal/runtime/registry/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package registry implements the RuntimeSDK registry. +package registry diff --git a/internal/runtime/registry/registry.go b/internal/runtime/registry/registry.go new file mode 100644 index 000000000000..330ef56eade1 --- /dev/null +++ b/internal/runtime/registry/registry.go @@ -0,0 +1,256 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "sync" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + kerrors "k8s.io/apimachinery/pkg/util/errors" + + runtimev1 "sigs.k8s.io/cluster-api/exp/runtime/api/v1alpha1" + "sigs.k8s.io/cluster-api/internal/runtime/catalog" +) + +// ExtensionRegistry defines the funcs of a RuntimeExtension registry. +type ExtensionRegistry interface { + // WarmUp can be used to initialize a "cold" RuntimeExtension registry with all + // known runtimev1.ExtensionConfigs at a given time. + // After WarmUp completes the RuntimeExtension registry is considered ready. + WarmUp(extensionConfigList *runtimev1.ExtensionConfigList) error + + // IsReady returns true if the RuntimeExtension registry is ready for usage. + // This happens after WarmUp is completed. + IsReady() bool + + // Add adds all RuntimeExtensions of the given ExtensionConfig. + // Please note that if the ExtensionConfig has been added before, the + // corresponding registry entries will get updated/replaced with the + // one from the newly provided ExtensionConfig. + Add(extensionConfig *runtimev1.ExtensionConfig) error + + // Remove removes all RuntimeExtensions corresponding to the provided ExtensionConfig. + Remove(extensionConfig *runtimev1.ExtensionConfig) error + + // List all registered RuntimeExtensions for a given catalog.GroupHook. + List(gh catalog.GroupHook) ([]*ExtensionRegistration, error) + + // Get the RuntimeExtensions with the given name. + Get(name string) (*ExtensionRegistration, error) +} + +// ExtensionRegistration contains information about a registered RuntimeExtension. +type ExtensionRegistration struct { + // Name is the unique name of the RuntimeExtension. + Name string + + // ExtensionConfigName is the name of the corresponding ExtensionConfig. + ExtensionConfigName string + + // GroupVersionHook is the GroupVersionHook that the RuntimeExtension implements. + GroupVersionHook catalog.GroupVersionHook + + // ClientConfig is the ClientConfig to communicate with the RuntimeExtension. + ClientConfig runtimev1.ClientConfig + // TimeoutSeconds is the timeout duration used for calls to the RuntimeExtension. + TimeoutSeconds *int32 + // FailurePolicy defines how failures in calls to the RuntimeExtension should be handled by a client. + FailurePolicy *runtimev1.FailurePolicy +} + +// extensionRegistry is a implementation of ExtensionRegistry. +type extensionRegistry struct { + // ready represents if the registry has been warmed up. + ready bool + // items contains the registry entries. + items map[string]*ExtensionRegistration + // lock is used to synchronize access to fields of the extensionRegistry. + lock sync.RWMutex +} + +// New returns a new ExtensionRegistry. +func New() ExtensionRegistry { + return &extensionRegistry{ + items: map[string]*ExtensionRegistration{}, + } +} + +// WarmUp can be used to initialize a "cold" RuntimeExtension registry with all +// known runtimev1.ExtensionConfigs at a given time. +// After WarmUp completes the RuntimeExtension registry is considered ready. +func (r *extensionRegistry) WarmUp(extensionConfigList *runtimev1.ExtensionConfigList) error { + if extensionConfigList == nil { + return errors.New("invalid argument: when calling WarmUp extensionConfigList must not be nil") + } + + r.lock.Lock() + defer r.lock.Unlock() + + if r.ready { + return errors.New("invalid operation: WarmUp cannot be called on a registry which has already been warmed up") + } + + var allErrs []error + for i := range extensionConfigList.Items { + if err := r.add(&extensionConfigList.Items[i]); err != nil { + allErrs = append(allErrs, err) + } + } + if len(allErrs) > 0 { + // Reset the map, so that the next WarmUp can start with an empty map + // and doesn't inherit entries from this failed WarmUp. + r.items = map[string]*ExtensionRegistration{} + return kerrors.NewAggregate(allErrs) + } + + r.ready = true + return nil +} + +// IsReady returns true if the RuntimeExtension registry is ready for usage. +// This happens after WarmUp is completed. +func (r *extensionRegistry) IsReady() bool { + r.lock.RLock() + defer r.lock.RUnlock() + + return r.ready +} + +// Add adds all RuntimeExtensions of the given ExtensionConfig. +// Please note that if the ExtensionConfig has been added before, the +// corresponding registry entries will get updated/replaced with the +// one from the newly provided ExtensionConfig. +func (r *extensionRegistry) Add(extensionConfig *runtimev1.ExtensionConfig) error { + if extensionConfig == nil { + return errors.New("invalid argument: when calling Add extensionConfig must not be nil") + } + + r.lock.Lock() + defer r.lock.Unlock() + + if !r.ready { + return errors.New("invalid operation: Add cannot be called on a registry which has not been warmed up") + } + + return r.add(extensionConfig) +} + +// Remove removes all RuntimeExtensions corresponding to the provided ExtensionConfig. +func (r *extensionRegistry) Remove(extensionConfig *runtimev1.ExtensionConfig) error { + if extensionConfig == nil { + return errors.New("invalid argument: when calling Remove extensionConfig must not be nil") + } + + r.lock.Lock() + defer r.lock.Unlock() + + if !r.ready { + return errors.New("invalid operation: Remove cannot be called on a registry which has not been warmed up") + } + + r.remove(extensionConfig) + return nil +} + +func (r *extensionRegistry) remove(extensionConfig *runtimev1.ExtensionConfig) { + for _, e := range r.items { + if e.ExtensionConfigName == extensionConfig.Name { + delete(r.items, e.Name) + } + } +} + +// List all registered RuntimeExtensions for a given catalog.GroupHook. +func (r *extensionRegistry) List(gh catalog.GroupHook) ([]*ExtensionRegistration, error) { + if gh.Group == "" { + return nil, errors.New("invalid argument: when calling List gh.Group must not be empty") + } + if gh.Hook == "" { + return nil, errors.New("invalid argument: when calling List gh.Hook must not be empty") + } + + r.lock.RLock() + defer r.lock.RUnlock() + + if !r.ready { + return nil, errors.New("invalid operation: List cannot be called on a registry which has not been warmed up") + } + + l := []*ExtensionRegistration{} + for _, registration := range r.items { + if registration.GroupVersionHook.Group == gh.Group && registration.GroupVersionHook.Hook == gh.Hook { + l = append(l, registration) + } + } + return l, nil +} + +// Get the RuntimeExtensions with the given name. +func (r *extensionRegistry) Get(name string) (*ExtensionRegistration, error) { + r.lock.RLock() + defer r.lock.RUnlock() + + if !r.ready { + return nil, errors.New("invalid operation: Get cannot called on a registry not yet ready") + } + + registration, ok := r.items[name] + if !ok { + return nil, errors.Errorf("RuntimeExtension with name %q has not been registered", name) + } + + return registration, nil +} + +func (r *extensionRegistry) add(extensionConfig *runtimev1.ExtensionConfig) error { + r.remove(extensionConfig) + + var allErrs []error + registrations := []*ExtensionRegistration{} + for _, e := range extensionConfig.Status.Handlers { + gv, err := schema.ParseGroupVersion(e.RequestHook.APIVersion) + if err != nil { + allErrs = append(allErrs, errors.Wrapf(err, "failed to parse GroupVersion %q", e.RequestHook.APIVersion)) + continue + } + + // Registrations will only be added to the registry if no errors occur (all or nothing). + registrations = append(registrations, &ExtensionRegistration{ + ExtensionConfigName: extensionConfig.Name, + Name: e.Name, + GroupVersionHook: catalog.GroupVersionHook{ + Group: gv.Group, + Version: gv.Version, + Hook: e.RequestHook.Hook, + }, + ClientConfig: extensionConfig.Spec.ClientConfig, + TimeoutSeconds: e.TimeoutSeconds, + FailurePolicy: e.FailurePolicy, + }) + } + + if len(allErrs) > 0 { + return kerrors.NewAggregate(allErrs) + } + + for _, registration := range registrations { + r.items[registration.Name] = registration + } + + return nil +} diff --git a/internal/runtime/registry/registry_test.go b/internal/runtime/registry/registry_test.go new file mode 100644 index 000000000000..5764506884fd --- /dev/null +++ b/internal/runtime/registry/registry_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "testing" + + . "github.com/onsi/gomega" + "github.com/onsi/gomega/format" + "github.com/onsi/gomega/types" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + + runtimev1 "sigs.k8s.io/cluster-api/exp/runtime/api/v1alpha1" + "sigs.k8s.io/cluster-api/internal/runtime/catalog" +) + +func TestColdRegistry(t *testing.T) { + g := NewWithT(t) + + r := New() + g.Expect(r.IsReady()).To(BeFalse()) + + // Add, Remove, List and Get should fail with a cold registry. + g.Expect(r.Add(&runtimev1.ExtensionConfig{})).ToNot(Succeed()) + g.Expect(r.Remove(&runtimev1.ExtensionConfig{})).ToNot(Succeed()) + _, err := r.List(catalog.GroupHook{Group: "foo", Hook: "bak"}) + g.Expect(err).To(HaveOccurred()) + _, err = r.Get("foo") + g.Expect(err).To(HaveOccurred()) +} + +func TestWarmUpRegistry(t *testing.T) { + g := NewWithT(t) + + extensionConfigList := &runtimev1.ExtensionConfigList{ + Items: []runtimev1.ExtensionConfig{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-extension", + }, + Status: runtimev1.ExtensionConfigStatus{ + Handlers: []runtimev1.ExtensionHandler{ + { + Name: "handler.test-extension", + RequestHook: runtimev1.GroupVersionHook{ + APIVersion: "foo/v1alpha1", + Hook: "bak", + }, + }, + }, + }, + }, + }, + } + + // WarmUp registry. + r := New() + g.Expect(r.WarmUp(extensionConfigList)).To(Succeed()) + g.Expect(r.IsReady()).To(BeTrue()) + + // A second WarmUp should fail, registry should stay ready. + g.Expect(r.WarmUp(extensionConfigList)).ToNot(Succeed()) + g.Expect(r.IsReady()).To(BeTrue()) + + // Add, Remove, List and Get should work with a warmed up registry. + g.Expect(r.Add(&runtimev1.ExtensionConfig{})).To(Succeed()) + g.Expect(r.Remove(&runtimev1.ExtensionConfig{})).To(Succeed()) + + registrations, err := r.List(catalog.GroupHook{Group: "foo", Hook: "bak"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registrations).To(HaveLen(1)) + g.Expect(registrations[0].Name).To(Equal("handler.test-extension")) + + registration, err := r.Get("handler.test-extension") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registration.Name).To(Equal("handler.test-extension")) +} + +func TestRegistry(t *testing.T) { + g := NewWithT(t) + + extension1 := &runtimev1.ExtensionConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "extension1", + }, + Spec: runtimev1.ExtensionConfigSpec{ + ClientConfig: runtimev1.ClientConfig{ + URL: pointer.String("https://extesions1.com/"), + }, + }, + Status: runtimev1.ExtensionConfigStatus{ + Handlers: []runtimev1.ExtensionHandler{ + { + Name: "foo.extension1", + RequestHook: runtimev1.GroupVersionHook{ + APIVersion: "hook.runtime.cluster.x-k8s.io/v1alpha1", + Hook: "BeforeClusterUpgrade", + }, + }, + { + Name: "bar.extension1", + RequestHook: runtimev1.GroupVersionHook{ + APIVersion: "hook.runtime.cluster.x-k8s.io/v1alpha1", + Hook: "BeforeClusterUpgrade", + }, + }, + { + Name: "baz.extension1", + RequestHook: runtimev1.GroupVersionHook{ + APIVersion: "hook.runtime.cluster.x-k8s.io/v1alpha1", + Hook: "AfterClusterUpgrade", + }, + }, + }, + }, + } + + extension2 := &runtimev1.ExtensionConfig{ + ObjectMeta: metav1.ObjectMeta{ + Name: "extension2", + }, + Spec: runtimev1.ExtensionConfigSpec{ + ClientConfig: runtimev1.ClientConfig{ + URL: pointer.String("https://extesions2.com/"), + }, + }, + Status: runtimev1.ExtensionConfigStatus{ + Handlers: []runtimev1.ExtensionHandler{ + { + Name: "qux.extension2", + RequestHook: runtimev1.GroupVersionHook{ + APIVersion: "hook.runtime.cluster.x-k8s.io/v1alpha1", + Hook: "AfterClusterUpgrade", + }, + }, + }, + }, + } + + // WarmUp with extension1 + e := New() + err := e.WarmUp(&runtimev1.ExtensionConfigList{Items: []runtimev1.ExtensionConfig{*extension1}}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(e.IsReady()).To(BeTrue()) + + // Get an extension by name + registration, err := e.Get("foo.extension1") + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registration.Name).To(Equal("foo.extension1")) + + // List all BeforeClusterUpgrade extensions + registrations, err := e.List(catalog.GroupHook{Group: "hook.runtime.cluster.x-k8s.io", Hook: "BeforeClusterUpgrade"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registrations).To(HaveLen(2)) + g.Expect(registrations).To(ContainExtension("foo.extension1")) + g.Expect(registrations).To(ContainExtension("bar.extension1")) + + // List all AfterClusterUpgrade extensions + registrations, err = e.List(catalog.GroupHook{Group: "hook.runtime.cluster.x-k8s.io", Hook: "AfterClusterUpgrade"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registrations).To(HaveLen(1)) + g.Expect(registrations).To(ContainExtension("baz.extension1")) + + // Add extension2 with one more AfterClusterUpgrade and check it is there + g.Expect(e.Add(extension2)).To(Succeed()) + + registrations, err = e.List(catalog.GroupHook{Group: "hook.runtime.cluster.x-k8s.io", Hook: "AfterClusterUpgrade"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registrations).To(HaveLen(2)) + g.Expect(registrations).To(ContainExtension("baz.extension1")) + g.Expect(registrations).To(ContainExtension("qux.extension2")) + + // Remove extension1 and check everything is updated + g.Expect(e.Remove(extension1)).To(Succeed()) + + registrations, err = e.List(catalog.GroupHook{Group: "hook.runtime.cluster.x-k8s.io", Hook: "BeforeClusterUpgrade"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registrations).To(HaveLen(0)) + + registrations, err = e.List(catalog.GroupHook{Group: "hook.runtime.cluster.x-k8s.io", Hook: "AfterClusterUpgrade"}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(registrations).To(HaveLen(1)) + g.Expect(registrations).To(ContainExtension("qux.extension2")) +} + +func ContainExtension(name string) types.GomegaMatcher { + return &ContainExtensionMatcher{ + name: name, + } +} + +type ContainExtensionMatcher struct { + name string +} + +func (matcher *ContainExtensionMatcher) Match(actual interface{}) (success bool, err error) { + ext, ok := actual.([]*ExtensionRegistration) + if !ok { + return false, errors.Errorf("Expecting *ExtensionRegistration, got %t", actual) + } + + for _, e := range ext { + if e.Name == matcher.name { + return true, nil + } + } + return false, nil +} + +func (matcher *ContainExtensionMatcher) FailureMessage(actual interface{}) (message string) { + return format.Message(actual, "to contain element matching", matcher.name) +} + +func (matcher *ContainExtensionMatcher) NegatedFailureMessage(actual interface{}) (message string) { + return format.Message(actual, "not to contain element matching", matcher.name) +}