From 27ab48e06d64a2627433944f45136003f13603b3 Mon Sep 17 00:00:00 2001 From: renxiangyu Date: Sat, 7 Oct 2023 19:38:40 +0800 Subject: [PATCH] feat: add secret controller in knode Signed-off-by: renxiangyu --- .../knode-manager/adapters/k8s/secret.go | 67 ++++ .../knode-manager/controllers/secret.go | 286 ++++++++++++++++++ 2 files changed, 353 insertions(+) create mode 100644 pkg/clustertree/knode-manager/adapters/k8s/secret.go create mode 100644 pkg/clustertree/knode-manager/controllers/secret.go diff --git a/pkg/clustertree/knode-manager/adapters/k8s/secret.go b/pkg/clustertree/knode-manager/adapters/k8s/secret.go new file mode 100644 index 000000000..790896c4e --- /dev/null +++ b/pkg/clustertree/knode-manager/adapters/k8s/secret.go @@ -0,0 +1,67 @@ +package k8sadapter + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils" +) + +type SecretAdapter struct { + master kubernetes.Interface + client kubernetes.Interface +} + +func NewSecretAdapter(ctx context.Context, ac *AdapterConfig) (*SecretAdapter, error) { + adapter := &SecretAdapter{ + master: ac.Master, + client: ac.Client, + } + + return adapter, nil +} + +func (s *SecretAdapter) Update(ctx context.Context, secret, new *corev1.Secret) (*corev1.Secret, error) { + updateSecret := new + utils.SetObjectGlobal(&updateSecret.ObjectMeta) + newSecret, err := s.client.CoreV1().Secrets(secret.Namespace).Update(ctx, updateSecret, metav1.UpdateOptions{}) + if err != nil { + return secret, err + } + return newSecret, nil +} + +func (s *SecretAdapter) Delete(ctx context.Context, namespace string, secretName string) error { + err := s.client.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + return err + } + } + return nil +} + +func (s *SecretAdapter) Create(ctx context.Context, secret *corev1.Secret) error { + err := s.createSecret(ctx, secret) + if err != nil { + return err + } + return nil +} + +func (s *SecretAdapter) createSecret(ctx context.Context, secret *corev1.Secret) error { + utils.TrimObjectMeta(&secret.ObjectMeta) + utils.SetObjectGlobal(&secret.ObjectMeta) + + _, err := s.client.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{}) + if err != nil { + if !errors.IsAlreadyExists(err) { + return err + } + } + return nil +} diff --git a/pkg/clustertree/knode-manager/controllers/secret.go b/pkg/clustertree/knode-manager/controllers/secret.go new file mode 100644 index 000000000..a11d50af6 --- /dev/null +++ b/pkg/clustertree/knode-manager/controllers/secret.go @@ -0,0 +1,286 @@ +package controllers + +import ( + "context" + "fmt" + "reflect" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + lister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" + + "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/adapters" + "github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils" +) + +// SecretController is a controller sync secret from master cluster to client cluster +type SecretController struct { + handler adapters.SecretHandler + + masterLister lister.SecretLister + masterSecretQueue workqueue.RateLimitingInterface + masterSynced cache.InformerSynced + + clientLister lister.SecretLister + clientSecretQueue workqueue.RateLimitingInterface + clientSynced cache.InformerSynced +} + +// NewSecretController returns a new *SecretController +func NewSecretController(adapter adapters.SecretHandler, mInformer, cInformer kubeinformers.SharedInformerFactory) (*SecretController, error) { + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second) + secret := &SecretController{ + handler: adapter, + masterSecretQueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "Knode master secret queue"), + clientSecretQueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "Knode client secret queue"), + } + + masterInformer := mInformer.Core().V1().Secrets() + clientInformer := cInformer.Core().V1().Secrets() + + // master + if _, err := masterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + UpdateFunc: secret.MasterUpdateSecret, + DeleteFunc: secret.MasterDeleteSecret, + }, 0); err != nil { + return nil, err + } + secret.masterLister = masterInformer.Lister() + secret.masterSynced = masterInformer.Informer().HasSynced + + // client + if _, err := clientInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + UpdateFunc: secret.ClientUpdateSecret, + DeleteFunc: secret.ClientDeleteSecret, + }, 0); err != nil { + return nil, err + } + secret.clientLister = clientInformer.Lister() + secret.clientSynced = clientInformer.Informer().HasSynced + + return secret, nil +} + +func (s *SecretController) MasterUpdateSecret(oldObj, newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + runtime.HandleError(err) + return + } + s.masterSecretQueue.Add(key) + klog.Infof("enqueue master secret update key: %v", key) +} + +func (s *SecretController) MasterDeleteSecret(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + s.masterSecretQueue.Add(key) + klog.Infof("enqueue master secret delete key: %v", key) +} + +func (s *SecretController) ClientUpdateSecret(oldObj, newObj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(newObj) + if err != nil { + runtime.HandleError(err) + return + } + s.clientSecretQueue.Add(key) + klog.Infof("enqueue client secret update key: %v", key) +} + +func (s *SecretController) ClientDeleteSecret(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + s.clientSecretQueue.Add(key) + klog.Infof("enqueue client secret delete key: %v", key) +} + +// Run starts and listens on channel events +func (s *SecretController) Run(ctx context.Context) error { + defer s.masterSecretQueue.ShutDown() + defer s.clientSecretQueue.ShutDown() + + klog.Info("kosmos starting secret controller") + defer klog.Info("kosmos shutting secret controller") + + stopCh := ctx.Done() + workers := 1 + + if !cache.WaitForCacheSync(stopCh, s.masterSynced) { + return fmt.Errorf("kosmos cannot sync master secret cache") + } + klog.Info("kosmos sync master secret cache success") + + if !cache.WaitForCacheSync(stopCh, s.clientSynced) { + return fmt.Errorf("kosmos cannot sync client secret cache") + } + klog.Info("kosmos sync client secret cache success") + + for i := 0; i < workers; i++ { + go wait.Until(s.syncClientSecret, 0, stopCh) + go wait.Until(s.resyncClientSecret, 0, stopCh) + } + + <-stopCh + return nil +} + +// sync master secret -> client secret +func (s *SecretController) syncClientSecret() { + secret2key, result := s.masterSecretQueue.Get() + if result { + return + } + defer s.masterSecretQueue.Done(secret2key) + ns, secretName, err := cache.SplitMetaNamespaceKey(secret2key.(string)) + if err != nil { + s.masterSecretQueue.Forget(secret2key.(string)) + return + } + + klog.Infof("kosmos sync secret: start sync secret, secret: %v", secret2key.(string)) + secretLifecycle := false + + secretInstance, err := s.masterLister.Secrets(ns).Get(secretName) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("kosmos sync: get secret from master list failed, error: %v", err) + return + } + secretLifecycle = true + } + defer func() { + if err != nil { + s.masterSecretQueue.AddRateLimited(secret2key) + return + } + s.masterSecretQueue.Forget(secret2key) + }() + if secretLifecycle || secretInstance.DeletionTimestamp != nil { + err = s.handler.Delete(context.TODO(), ns, secretName) + if err != nil { + klog.Errorf("kosmos sync: delete secret from client list failed, secret: %v", secretName) + return + } + klog.Infof("kosmos sync: delete secret from client list succeed, secret: %v", secretName) + return + } + + klog.Infof("Secret %v/%v to be update", ns, secretName) + s.syncSecretHandler(secretInstance) +} + +// resync master secret -> client secret (Prevents the client secret from being incorrectly modified) +func (s *SecretController) resyncClientSecret() { + secret2key, result := s.clientSecretQueue.Get() + if result { + return + } + defer s.clientSecretQueue.Done(secret2key) + ns, secretName, err := cache.SplitMetaNamespaceKey(secret2key.(string)) + if err != nil { + s.clientSecretQueue.Forget(secret2key.(string)) + return + } + + var masterSecret *corev1.Secret + var clientSecret *corev1.Secret + + clientSecret, err = s.clientLister.Secrets(ns).Get(secretName) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("kosmos sync: get secret from client list failed, error: %v", err) + return + } + masterSecretLifecycle := true + masterSecret, err = s.masterLister.Secrets(ns).Get(secretName) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("kosmos sync: get secret from master list failed, error: %v", err) + return + } + masterSecretLifecycle = false + } + // The client deletes its own secret. We need to synchronize the master's secret to the client + if masterSecretLifecycle && masterSecret.DeletionTimestamp == nil { + klog.Infof("kosmos sync: client secret %v/%v is deleted in error", masterSecret.Namespace, masterSecret.Name) + err := s.handler.Create(context.TODO(), masterSecret) + if err != nil { + klog.Errorf("kosmos sync: create client secret failed, error: %v", err) + return + } + klog.Infof("kosmos sync: client secret %v/%v created successfully", masterSecret.Namespace, masterSecret.Name) + } + return + } + + defer func() { + if err != nil { + s.clientSecretQueue.AddRateLimited(secret2key) + return + } + s.clientSecretQueue.Forget(secret2key) + }() + + masterSecret, err = s.masterLister.Secrets(ns).Get(secretName) + if err != nil { + klog.Errorf("kosmos sync: get secret from master list failed, error: %v", err) + return + } + if reflect.DeepEqual(clientSecret.Data, masterSecret.Data) && + reflect.DeepEqual(clientSecret.StringData, masterSecret.StringData) { + return + } + // The client updates its own secret. We need to synchronize the master's secret to the client + klog.Infof("Client secret %v/%v is updated in error", ns, secretName) + secretCopy := masterSecret.DeepCopy() + utils.TrimObjectMeta(&secretCopy.ObjectMeta) + if _, err = s.handler.Update(context.TODO(), clientSecret, secretCopy); err != nil { + klog.Infof("Update client secret %v/%v err: %v", clientSecret.Namespace, clientSecret.Name, err) + return + } + klog.Infof("Client secret %v/%v updated successfully", ns, secretName) +} + +func (s *SecretController) syncSecretHandler(secret *corev1.Secret) { + key, err := cache.MetaNamespaceKeyFunc(secret) + if err != nil { + runtime.HandleError(err) + return + } + defer func() { + if err != nil { + klog.Error(err) + s.masterSecretQueue.AddRateLimited(key) + return + } + }() + var secretInSub *corev1.Secret + secretInSub, err = s.clientLister.Secrets(secret.Namespace).Get(secret.Name) + if err != nil { + klog.Infof("secretInSub %v err: %v", secretInSub, err) + return + } + + secretCopy := secret.DeepCopy() + utils.TrimObjectMeta(&secretCopy.ObjectMeta) + klog.Infof("Old secret %+v\n, new secret %+v", secretInSub, secretCopy) + if _, err = s.handler.Update(context.TODO(), secretInSub, secretCopy); err != nil { + klog.Infof("Update secret %v err: %v", secretInSub, err) + return + } + klog.Infof("Handler secret: finished processing %q", secret.Name) +}