diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index 04d91fb4743..ac32e9c85f6 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -20,7 +20,7 @@ import ( "context" v1 "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1" + discoveryv1 "k8s.io/api/discovery/v1" discoveryV1beta1 "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -130,7 +130,7 @@ func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struc } switch v := obj.(type) { - case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discovery.EndpointSlice: + case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice: return stf.serviceTopologyHandler(v) default: return obj @@ -164,9 +164,9 @@ func (stf *serviceTopologyFilter) resolveServiceTopologyType(obj runtime.Object) case *discoveryV1beta1.EndpointSlice: svcNamespace = v.Namespace svcName = v.Labels[discoveryV1beta1.LabelServiceName] - case *discovery.EndpointSlice: + case *discoveryv1.EndpointSlice: svcNamespace = v.Namespace - svcName = v.Labels[discovery.LabelServiceName] + svcName = v.Labels[discoveryv1.LabelServiceName] case *v1.Endpoints: svcNamespace = v.Namespace svcName = v.Name @@ -190,7 +190,7 @@ func (stf *serviceTopologyFilter) nodeTopologyHandler(obj runtime.Object) runtim switch v := obj.(type) { case *discoveryV1beta1.EndpointSlice: return reassembleV1beta1EndpointSlice(v, stf.nodeName, nil) - case *discovery.EndpointSlice: + case *discoveryv1.EndpointSlice: return reassembleEndpointSlice(v, stf.nodeName, nil) case *v1.Endpoints: return reassembleEndpoints(v, stf.nodeName, nil) @@ -215,7 +215,7 @@ func (stf *serviceTopologyFilter) nodePoolTopologyHandler(obj runtime.Object) ru switch v := obj.(type) { case *discoveryV1beta1.EndpointSlice: return reassembleV1beta1EndpointSlice(v, "", nodes) - case *discovery.EndpointSlice: + case *discoveryv1.EndpointSlice: return reassembleEndpointSlice(v, "", nodes) case *v1.Endpoints: return reassembleEndpoints(v, "", nodes) @@ -252,13 +252,13 @@ func reassembleV1beta1EndpointSlice(endpointSlice *discoveryV1beta1.EndpointSlic } // reassembleEndpointSlice will discard endpoints that are not on the same node/nodePool for v1.EndpointSlice -func reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice, nodeName string, nodes []string) *discovery.EndpointSlice { +func reassembleEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, nodeName string, nodes []string) *discoveryv1.EndpointSlice { if len(nodeName) != 0 && len(nodes) != 0 { klog.Warningf("reassembleEndpointSlice: nodeName(%s) and nodePool can not be set at the same time", nodeName) return endpointSlice } - var newEps []discovery.Endpoint + var newEps []discoveryv1.Endpoint for i := range endpointSlice.Endpoints { if len(nodeName) != 0 { if *endpointSlice.Endpoints[i].NodeName == nodeName { diff --git a/pkg/yurthub/multiplexer/cache.go b/pkg/yurthub/multiplexer/cache.go new file mode 100644 index 00000000000..ac89511e5f9 --- /dev/null +++ b/pkg/yurthub/multiplexer/cache.go @@ -0,0 +1,74 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + kstorage "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/cacher" + "k8s.io/client-go/kubernetes/scheme" +) + +type Interface interface { + Watch(ctx context.Context, key string, opts kstorage.ListOptions) (watch.Interface, error) + Get(ctx context.Context, key string, opts kstorage.GetOptions, objPtr runtime.Object) error + GetList(ctx context.Context, key string, opts kstorage.ListOptions, listObj runtime.Object) error +} + +type ResourceCacheConfig struct { + KeyFunc func(runtime.Object) (string, error) + NewFunc func() runtime.Object + NewListFunc func() runtime.Object + GetAttrsFunc kstorage.AttrFunc +} + +func NewResourceCache( + s kstorage.Interface, + resource *schema.GroupVersionResource, + config *ResourceCacheConfig) (Interface, func(), error) { + + cacheConfig := cacher.Config{ + Storage: s, + Versioner: kstorage.APIObjectVersioner{}, + GroupResource: resource.GroupResource(), + KeyFunc: config.KeyFunc, + NewFunc: config.NewFunc, + NewListFunc: config.NewListFunc, + GetAttrsFunc: config.GetAttrsFunc, + Codec: scheme.Codecs.LegacyCodec(resource.GroupVersion()), + } + + cacher, err := cacher.NewCacherFromConfig(cacheConfig) + if err != nil { + return nil, func() {}, fmt.Errorf("failed to new cacher from config, error: %v", err) + } + + var once sync.Once + destroyFunc := func() { + once.Do(func() { + cacher.Stop() + }) + } + + return cacher, destroyFunc, nil +} diff --git a/pkg/yurthub/multiplexer/cache_test.go b/pkg/yurthub/multiplexer/cache_test.go new file mode 100644 index 00000000000..21b04e0410d --- /dev/null +++ b/pkg/yurthub/multiplexer/cache_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + + ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" +) + +var serviceGVR = &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", +} + +var newServiceFunc = func() runtime.Object { + return &v1.Service{} +} + +var newServiceListFunc = func() runtime.Object { + return &v1.ServiceList{} +} + +func TestResourceCache_GetList(t *testing.T) { + cache, _, err := NewResourceCache( + ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}), + serviceGVR, + &ResourceCacheConfig{ + keyFunc, + newServiceFunc, + newServiceListFunc, + attrsFunc, + }, + ) + + assert.Nil(t, err) + assertCacheGetList(t, cache) +} + +func mockListOptions() storage.ListOptions { + return storage.ListOptions{ + ResourceVersion: "100", + Recursive: true, + Predicate: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + }, + } +} + +func assertCacheGetList(t testing.TB, cache Interface) { + t.Helper() + + serviceList := &v1.ServiceList{} + err := cache.GetList(context.Background(), "", mockListOptions(), serviceList) + + assert.Nil(t, err) + assert.Equal(t, 1, len(serviceList.Items)) +} + +func TestResourceCache_Watch(t *testing.T) { + fakeStorage := ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) + + cache, _, err := NewResourceCache( + fakeStorage, + serviceGVR, + &ResourceCacheConfig{ + keyFunc, + newServiceFunc, + newServiceListFunc, + attrsFunc, + }, + ) + + assert.Nil(t, err) + assertCacheWatch(t, cache, fakeStorage) +} + +func mockWatchOptions() storage.ListOptions { + var sendInitialEvents = true + + return storage.ListOptions{ + ResourceVersion: "100", + Predicate: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + }, + Recursive: true, + SendInitialEvents: &sendInitialEvents, + } +} + +func assertCacheWatch(t testing.TB, cache Interface, fs *ystorage.FakeServiceStorage) { + receive, err := cache.Watch(context.TODO(), "", mockWatchOptions()) + + go func() { + fs.AddWatchObject(newService(metav1.NamespaceSystem, "coredns2")) + }() + + assert.Nil(t, err) + event := <-receive.ResultChan() + assert.Equal(t, watch.Added, event.Type) +} + +func TestResourceCache_Get(t *testing.T) { + cache, _, err := NewResourceCache( + ystorage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}), + serviceGVR, + &ResourceCacheConfig{ + keyFunc, + newServiceFunc, + newServiceListFunc, + attrsFunc, + }, + ) + assert.Nil(t, err) + assertCacheGet(t, cache) +} + +func assertCacheGet(t testing.TB, cache Interface) { + t.Helper() + + service := &v1.Service{} + err := cache.Get(context.Background(), "/kube-system/coredns", storage.GetOptions{ + ResourceVersion: "1", + }, service) + + assert.Nil(t, err) + assert.Equal(t, "coredns", service.Name) +} diff --git a/pkg/yurthub/multiplexer/manager.go b/pkg/yurthub/multiplexer/manager.go new file mode 100644 index 00000000000..aa27a104f71 --- /dev/null +++ b/pkg/yurthub/multiplexer/manager.go @@ -0,0 +1,171 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "github.com/pkg/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + + kmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta" + ystorage "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" +) + +var keyFunc = func(obj runtime.Object) (string, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return "", err + } + + name := accessor.GetName() + if len(name) == 0 { + return "", apierrors.NewBadRequest("Name parameter required.") + } + + ns := accessor.GetNamespace() + if len(ns) == 0 { + return "/" + name, nil + } + return "/" + ns + "/" + name, nil +} + +var attrsFunc = func(obj runtime.Object) (labels.Set, fields.Set, error) { + metadata, err := meta.Accessor(obj) + if err != nil { + return nil, nil, err + } + + var fieldSet fields.Set + if len(metadata.GetNamespace()) > 0 { + fieldSet = fields.Set{ + "metadata.name": metadata.GetName(), + "metadata.namespace": metadata.GetNamespace(), + } + } else { + fieldSet = fields.Set{ + "metadata.name": metadata.GetName(), + } + } + + return labels.Set(metadata.GetLabels()), fieldSet, nil +} + +type MultiplexerManager interface { + ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) + ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) +} + +type multiplexerManager struct { + restStoreManager ystorage.StorageManager + restMapper meta.RESTMapper + cacheMap map[string]Interface + cacheConfigMap map[string]*ResourceCacheConfig + cacheDestroyFuncMap map[string]func() +} + +func NewRequestsMultiplexerManager( + restStoreManager ystorage.StorageManager) MultiplexerManager { + + return &multiplexerManager{ + restStoreManager: restStoreManager, + restMapper: kmeta.NewDefaultRESTMapperFromScheme(), + cacheMap: make(map[string]Interface), + cacheConfigMap: make(map[string]*ResourceCacheConfig), + cacheDestroyFuncMap: make(map[string]func()), + } +} + +func (m *multiplexerManager) ResourceCacheConfig(gvr *schema.GroupVersionResource) (*ResourceCacheConfig, error) { + if config, ok := m.cacheConfigMap[gvr.String()]; ok { + return config, nil + } + + gvk, listGVK, err := m.convertToGVK(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to convert to gvk from gvr %s", gvr.String()) + } + + config := m.newResourceCacheConfig(gvk, listGVK) + + m.cacheConfigMap[gvr.String()] = config + return config, nil +} + +func (m *multiplexerManager) convertToGVK(gvr *schema.GroupVersionResource) (schema.GroupVersionKind, schema.GroupVersionKind, error) { + gvk, err := m.restMapper.KindFor(*gvr) + if err != nil { + return schema.GroupVersionKind{}, schema.GroupVersionKind{}, errors.Wrapf(err, "failed to convert gvk from gvr %s", gvr.String()) + } + + listGvk := schema.GroupVersionKind{ + Group: gvr.Group, + Version: gvr.Version, + Kind: gvk.Kind + "List", + } + + return gvk, listGvk, nil +} + +func (m *multiplexerManager) newResourceCacheConfig(gvk schema.GroupVersionKind, + listGVK schema.GroupVersionKind) *ResourceCacheConfig { + + resourceCacheConfig := &ResourceCacheConfig{ + NewFunc: func() runtime.Object { + obj, _ := scheme.Scheme.New(gvk) + return obj + }, + NewListFunc: func() (object runtime.Object) { + objList, _ := scheme.Scheme.New(listGVK) + return objList + }, + KeyFunc: keyFunc, + GetAttrsFunc: attrsFunc, + } + + return resourceCacheConfig +} + +func (m *multiplexerManager) ResourceCache(gvr *schema.GroupVersionResource) (Interface, func(), error) { + if sc, ok := m.cacheMap[gvr.String()]; ok { + return sc, m.cacheDestroyFuncMap[gvr.String()], nil + } + + restStore, err := m.restStoreManager.ResourceStorage(gvr) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to get rest store") + } + + resourceCacheConfig, err := m.ResourceCacheConfig(gvr) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to generate resource cache config") + } + + sc, destroy, err := NewResourceCache(restStore, gvr, resourceCacheConfig) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to new resource cache") + } + + m.cacheMap[gvr.String()] = sc + m.cacheDestroyFuncMap[gvr.String()] = destroy + + return sc, destroy, nil +} diff --git a/pkg/yurthub/multiplexer/manager_test.go b/pkg/yurthub/multiplexer/manager_test.go new file mode 100644 index 00000000000..0ee45cde9ec --- /dev/null +++ b/pkg/yurthub/multiplexer/manager_test.go @@ -0,0 +1,180 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multiplexer + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + kstorage "k8s.io/apiserver/pkg/storage" + + "github.com/openyurtio/openyurt/pkg/yurthub/multiplexer/storage" +) + +func TestShareCacheManager_ResourceCacheConfig(t *testing.T) { + svcStorage := storage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) + storageMap := map[string]kstorage.Interface{ + serviceGVR.String(): svcStorage, + } + + sm := NewRequestsMultiplexerManager( + storage.NewDummyStorageManager(storageMap)) + + for _, tc := range []struct { + tname string + gvr *schema.GroupVersionResource + obj runtime.Object + expectedKey string + expectedObjType string + expectedObjListType string + expectedFieldSet fields.Set + namespaceScoped bool + }{ + { + "generate resource config for services", + &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", + }, + newService(metav1.NamespaceSystem, "coredns"), + "/kube-system/coredns", + "Service", + "ServiceList", + fields.Set{ + "metadata.name": "coredns", + "metadata.namespace": "kube-system", + }, + true, + }, + { + "generate resource config for endpointslices", + &schema.GroupVersionResource{ + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", + }, + newEndpointSlice(), + "/kube-system/coredns-12345", + "EndpointSlice", + "EndpointSliceList", + fields.Set{ + "metadata.name": "coredns-12345", + "metadata.namespace": "kube-system", + }, + true, + }, + { + "generate resource config for nodes", + &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "nodes", + }, + newNode(), + "/test", + "Node", + "NodeList", + fields.Set{ + "metadata.name": "test", + }, + false, + }, + } { + t.Run(tc.tname, func(t *testing.T) { + rc, err := sm.ResourceCacheConfig(tc.gvr) + + assert.Nil(t, err) + + key, _ := rc.KeyFunc(tc.obj) + assert.Equal(t, tc.expectedKey, key) + + obj := rc.NewFunc() + assert.Equal(t, tc.expectedObjType, reflect.TypeOf(obj).Elem().Name()) + + objList := rc.NewListFunc() + assert.Equal(t, tc.expectedObjListType, reflect.TypeOf(objList).Elem().Name()) + + _, fieldSet, _ := rc.GetAttrsFunc(tc.obj) + assert.Equal(t, tc.expectedFieldSet, fieldSet) + }) + } +} + +func newService(namespace, name string) *v1.Service { + return &v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } +} + +func newEndpointSlice() *discovery.EndpointSlice { + return &discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "kube-system", + Name: "coredns-12345", + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"192.168.0.10"}, + }, + }, + } +} + +func newNode() *v1.Node { + return &v1.Node{ + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + } +} + +func TestShareCacheManager_ResourceCache(t *testing.T) { + svcStorage := storage.NewFakeServiceStorage([]v1.Service{*newService(metav1.NamespaceSystem, "coredns")}) + storageMap := map[string]kstorage.Interface{ + serviceGVR.String(): svcStorage, + } + + dsm := storage.NewDummyStorageManager(storageMap) + scm := NewRequestsMultiplexerManager(dsm) + cache, _, err := scm.ResourceCache(serviceGVR) + + assert.Nil(t, err) + assertCacheGetList(t, cache) +} diff --git a/pkg/yurthub/multiplexer/storage/fake_storage.go b/pkg/yurthub/multiplexer/storage/fake_storage.go new file mode 100644 index 00000000000..9bd8a356bb4 --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/fake_storage.go @@ -0,0 +1,94 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" +) + +type CommonFakeStorage struct { +} + +func (fs *CommonFakeStorage) Versioner() storage.Versioner { + return nil +} + +func (fs *CommonFakeStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return nil +} + +func (fs *CommonFakeStorage) Delete( + ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, + validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { + return nil +} + +func (fs *CommonFakeStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return nil +} + +func (fs *CommonFakeStorage) GuaranteedUpdate( + ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { + return nil +} + +func (fs *CommonFakeStorage) Count(key string) (int64, error) { + return 0, nil +} + +func (fs *CommonFakeStorage) RequestWatchProgress(ctx context.Context) error { + return nil +} + +type FakeServiceStorage struct { + *CommonFakeStorage + items []v1.Service + watcher *watch.FakeWatcher +} + +func NewFakeServiceStorage(items []v1.Service) *FakeServiceStorage { + return &FakeServiceStorage{ + CommonFakeStorage: &CommonFakeStorage{}, + items: items, + watcher: watch.NewFake(), + } +} + +func (fs *FakeServiceStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + serviceList := listObj.(*v1.ServiceList) + serviceList.ListMeta = metav1.ListMeta{ + ResourceVersion: "100", + } + serviceList.Items = fs.items + return nil +} + +func (fs *FakeServiceStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return fs.watcher, nil +} + +func (fs *FakeServiceStorage) AddWatchObject(svc *v1.Service) { + svc.ResourceVersion = "101" + fs.watcher.Add(svc) +} diff --git a/pkg/yurthub/multiplexer/storage/fake_storagemanager.go b/pkg/yurthub/multiplexer/storage/fake_storagemanager.go new file mode 100644 index 00000000000..30dcff91f3d --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/fake_storagemanager.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" +) + +type DummyStorageManager struct { + StorageMap map[string]storage.Interface + Err error +} + +func NewDummyStorageManager(storageMap map[string]storage.Interface) *DummyStorageManager { + return &DummyStorageManager{ + StorageMap: storageMap, + Err: nil, + } +} + +func (dsm *DummyStorageManager) ResourceStorage(gvr *schema.GroupVersionResource) (storage.Interface, error) { + if store, ok := dsm.StorageMap[gvr.String()]; ok { + return store, dsm.Err + } + + return dsm.StorageMap[gvr.String()], dsm.Err +} diff --git a/pkg/yurthub/multiplexer/storage/manager.go b/pkg/yurthub/multiplexer/storage/manager.go new file mode 100644 index 00000000000..45a08439d0c --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/manager.go @@ -0,0 +1,80 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +type StorageManager interface { + ResourceStorage(gvr *schema.GroupVersionResource) (storage.Interface, error) +} + +type storageManager struct { + config *rest.Config + storageMap map[string]storage.Interface +} + +func NewStorageManager(config *rest.Config) StorageManager { + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + return &storageManager{ + config: config, + storageMap: make(map[string]storage.Interface), + } +} + +func (sm *storageManager) ResourceStorage(gvr *schema.GroupVersionResource) (storage.Interface, error) { + if rs, ok := sm.storageMap[gvr.String()]; ok { + return rs, nil + } + + restClient, err := sm.restClient(gvr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get rest client for %v", gvr) + } + + rs := &store{ + resource: gvr.Resource, + restClient: restClient, + } + + sm.storageMap[gvr.String()] = rs + + return rs, nil +} + +func (sm *storageManager) restClient(gvr *schema.GroupVersionResource) (rest.Interface, error) { + httpClient, _ := rest.HTTPClientFor(sm.config) + configShallowCopy := *sm.config + configShallowCopy.APIPath = getAPIPath(gvr) + + gv := gvr.GroupVersion() + configShallowCopy.GroupVersion = &gv + + return rest.RESTClientForConfigAndClient(&configShallowCopy, httpClient) +} + +func getAPIPath(gvr *schema.GroupVersionResource) string { + if gvr.Group == "" { + return "/api" + } + return "/apis" +} diff --git a/pkg/yurthub/multiplexer/storage/manager_test.go b/pkg/yurthub/multiplexer/storage/manager_test.go new file mode 100644 index 00000000000..513adfdb60b --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/manager_test.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/rest" +) + +var serviceGVR = &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", +} + +var endpointSlicesGVR = &schema.GroupVersionResource{ + Group: "discovery.k8s.io", + Version: "v1", + Resource: "endpointslices", +} + +func TestStorageManager_ResourceStorage(t *testing.T) { + sm := NewStorageManager(&rest.Config{ + Host: "http://127.0.0.1:10261", + UserAgent: "share-hub", + }) + + for _, tc := range []struct { + tName string + gvr *schema.GroupVersionResource + Err error + }{ + { + "get resource storage for services", + serviceGVR, + nil, + }, + { + "get resource storage for endpouintslices", + endpointSlicesGVR, + nil, + }, + } { + t.Run(tc.tName, func(t *testing.T) { + restore, err := sm.ResourceStorage(tc.gvr) + + assert.Nil(t, err) + assertResourceStore(t, tc.gvr, restore) + }) + } +} + +func assertResourceStore(t testing.TB, gvr *schema.GroupVersionResource, getRestStore storage.Interface) { + t.Helper() + + store, ok := getRestStore.(*store) + assert.Equal(t, true, ok) + assert.Equal(t, gvr.Resource, store.resource) + assert.Equal(t, gvr.GroupVersion(), store.restClient.APIVersion()) +} diff --git a/pkg/yurthub/multiplexer/storage/store.go b/pkg/yurthub/multiplexer/storage/store.go new file mode 100644 index 00000000000..1f8e1017b02 --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/store.go @@ -0,0 +1,103 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + "math/rand" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" +) + +const minWatchRequestSeconds = 300 + +var ErrNoSupport = errors.New("Don't Support Method ") + +type store struct { + restClient rest.Interface + resource string +} + +func NewStore(restClient rest.Interface, resource string) *store { + return &store{ + restClient: restClient, + resource: resource, + } +} + +func (rs *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + listOpts := &metav1.ListOptions{ + Limit: opts.Predicate.Limit, + Continue: opts.Predicate.Continue, + ResourceVersionMatch: opts.ResourceVersionMatch, + ResourceVersion: opts.ResourceVersion, + } + + return rs.restClient.Get().Resource(rs.resource).VersionedParams(listOpts, scheme.ParameterCodec).Do(ctx).Into(listObj) +} + +func (rs *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + timeoutSeconds := int64(float64(minWatchRequestSeconds) * (rand.Float64() + 1.0)) + + listOpts := &metav1.ListOptions{ + ResourceVersion: opts.ResourceVersion, + Watch: true, + TimeoutSeconds: &timeoutSeconds, + } + + w, err := rs.restClient.Get().Resource(rs.resource).VersionedParams(listOpts, scheme.ParameterCodec).Watch(ctx) + + return w, err +} + +func (rs *store) Versioner() storage.Versioner { + return nil +} + +func (rs *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { + return ErrNoSupport +} + +func (rs *store) Delete( + ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, + validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { + return ErrNoSupport +} + +func (rs *store) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return ErrNoSupport +} + +func (rs *store) GuaranteedUpdate( + ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool, + preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error { + return ErrNoSupport +} + +func (rs *store) Count(key string) (int64, error) { + return 0, ErrNoSupport +} + +func (rs *store) RequestWatchProgress(ctx context.Context) error { + return ErrNoSupport +} diff --git a/pkg/yurthub/multiplexer/storage/store_test.go b/pkg/yurthub/multiplexer/storage/store_test.go new file mode 100644 index 00000000000..d3f1dc809bc --- /dev/null +++ b/pkg/yurthub/multiplexer/storage/store_test.go @@ -0,0 +1,267 @@ +/* +Copyright 2024 The OpenYurt Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "bytes" + "context" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer/streaming" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/storage" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest/fake" +) + +var ( + corev1GV = schema.GroupVersion{Version: "v1"} + corev1Codec = scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(corev1GV), scheme.Codecs.UniversalDecoder(corev1GV), corev1GV, corev1GV) + + discoveryGV = schema.GroupVersion{Group: "discovery.k8s.io", Version: "v1"} + discoveryv1Codec = scheme.Codecs.CodecForVersions(scheme.Codecs.LegacyCodec(discoveryGV), scheme.Codecs.UniversalDecoder(discoveryGV), discoveryGV, discoveryGV) +) + +func TestRestStore_GetList(t *testing.T) { + t.Run(" list services", func(t *testing.T) { + rs := &store{ + restClient: newFakeClient(corev1GV, mockServiceListBody(), newListHeader()), + } + + getListObj := &corev1.ServiceList{} + err := rs.GetList(context.Background(), "", storage.ListOptions{}, getListObj) + + assert.Nil(t, err) + assert.Equal(t, 1, len(getListObj.Items)) + }) + + t.Run("list endpointslices", func(t *testing.T) { + rs := &store{ + restClient: newFakeClient(corev1GV, mockEndpointSlicesListBody(), newListHeader()), + } + + getListObj := &discovery.EndpointSliceList{} + err := rs.GetList(context.Background(), "", storage.ListOptions{}, getListObj) + + assert.Nil(t, err) + assert.Equal(t, 1, len(getListObj.Items)) + }) +} + +func newListHeader() http.Header { + header := http.Header{} + header.Set("Content-Type", runtime.ContentTypeJSON) + return header +} + +func mockServiceListBody() []byte { + str := runtime.EncodeOrDie(corev1Codec, newServiceList()) + return []byte(str) +} + +func mockEndpointSlicesListBody() []byte { + str := runtime.EncodeOrDie(discoveryv1Codec, newEndpointSliceList()) + return []byte(str) +} + +func newServiceList() *corev1.ServiceList { + return &corev1.ServiceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + Items: []corev1.Service{ + *newService(), + }, + } +} + +func newFakeClient(gv schema.GroupVersion, body []byte, header http.Header) *fake.RESTClient { + return &fake.RESTClient{ + GroupVersion: gv, + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Header: header, + Body: io.NopCloser(bytes.NewReader(body)), + }, nil + }), + } +} + +func newEndpointSliceList() *discovery.EndpointSliceList { + return &discovery.EndpointSliceList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + Items: []discovery.EndpointSlice{ + newEndpointSlice(), + }, + } +} + +func newEndpointSlice() discovery.EndpointSlice { + return discovery.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + Kind: "EndpointSlice", + APIVersion: "discovery.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "coredns-12345", + Namespace: "kube-system", + }, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"192.168.0.1"}, + NodeName: newStringPointer("node1"), + }, + { + Addresses: []string{"192.168.1.1"}, + NodeName: newStringPointer("node2"), + }, + { + Addresses: []string{"192.168.2.3"}, + NodeName: newStringPointer("node3"), + }, + }, + } +} + +func newStringPointer(str string) *string { + return &str +} + +func TestRestStore_Watch(t *testing.T) { + rs := &store{ + restClient: newFakeClient(corev1GV, mockServiceWatchBody(), newWatchHeader()), + } + + resultCh, err := rs.Watch(context.Background(), "", storage.ListOptions{}) + event := <-resultCh.ResultChan() + + assert.Nil(t, err) + assert.Equal(t, event.Type, watch.Added) +} + +func newWatchHeader() http.Header { + header := http.Header{} + header.Set("Transfer-Encoding", "chunked") + header.Set("Content-Type", runtime.ContentTypeJSON) + return header +} + +func mockServiceWatchBody() []byte { + serializer := scheme.Codecs.SupportedMediaTypes()[0] + framer := serializer.StreamSerializer.Framer + streamSerializer := serializer.StreamSerializer.Serializer + encoder := scheme.Codecs.EncoderForVersion(streamSerializer, corev1GV) + + buf := &bytes.Buffer{} + fb := framer.NewFrameWriter(buf) + + e := streaming.NewEncoder(fb, encoder) + + e.Encode(newOutEvent(newService())) + + return buf.Bytes() +} + +func newOutEvent(object runtime.Object) *metav1.WatchEvent { + internalEvent := metav1.InternalEvent{ + Type: watch.Added, + Object: object, + } + + outEvent := &metav1.WatchEvent{} + metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(&internalEvent, outEvent, nil) + + return outEvent +} + +func TestRestStore_Versioner(t *testing.T) { + rs := &store{} + + assert.Nil(t, rs.Versioner()) +} + +func TestRestStore_Create(t *testing.T) { + rs := &store{} + err := rs.Create(context.TODO(), "", newService(), newService(), 1) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_Delete(t *testing.T) { + rs := &store{} + err := rs.Delete(context.TODO(), "", newService(), nil, nil, nil) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_Get(t *testing.T) { + rs := &store{} + err := rs.Get(context.TODO(), "", storage.GetOptions{}, nil) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_GuaranteedUpdate(t *testing.T) { + rs := &store{} + err := rs.GuaranteedUpdate(context.TODO(), "", newService(), false, nil, nil, nil) + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_Count(t *testing.T) { + rs := &store{} + _, err := rs.Count("") + + assert.Equal(t, ErrNoSupport, err) +} + +func TestRestStore_RequestWatchProgress(t *testing.T) { + rs := &store{} + err := rs.RequestWatchProgress(context.TODO()) + + assert.Equal(t, ErrNoSupport, err) +} + +func newService() *corev1.Service { + return &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-dns", + Namespace: "kube-system", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "192.168.0.10", + }, + } +} diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 194ca1ca896..51b30774c60 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -41,6 +41,7 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, proxyHandler http.Handler, rest *rest.RestConfigManager, stopCh <-chan struct{}) error { + hubServerHandler := mux.NewRouter() registerHandlers(hubServerHandler, cfg, rest) @@ -72,7 +73,6 @@ func RunYurtHubServers(cfg *config.YurtHubConfiguration, return err } } - return nil } diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 79820689593..51429d9c85f 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -92,7 +92,7 @@ const ( ) var ( - DefaultCacheAgents = []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", projectinfo.GetAgentName(), projectinfo.GetHubName(), coordinatorconstants.DefaultPoolScopedUserAgent} + DefaultCacheAgents = []string{"kubelet", "kube-proxy", "flanneld", "coredns", "raven-agent-ds", "share-hub", projectinfo.GetAgentName(), projectinfo.GetHubName(), coordinatorconstants.DefaultPoolScopedUserAgent} YurthubConfigMapName = fmt.Sprintf("%s-hub-cfg", strings.TrimRightFunc(projectinfo.GetProjectPrefix(), func(c rune) bool { return c == '-' })) )