Skip to content

Commit

Permalink
feat: add secret controller in knode
Browse files Browse the repository at this point in the history
Signed-off-by: renxiangyu <[email protected]>
  • Loading branch information
renxiangyu committed Oct 23, 2023
1 parent a526400 commit 27ab48e
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 0 deletions.
67 changes: 67 additions & 0 deletions pkg/clustertree/knode-manager/adapters/k8s/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package k8sadapter

import (
"context"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils"
)

type SecretAdapter struct {
master kubernetes.Interface
client kubernetes.Interface
}

func NewSecretAdapter(ctx context.Context, ac *AdapterConfig) (*SecretAdapter, error) {
adapter := &SecretAdapter{
master: ac.Master,
client: ac.Client,
}

return adapter, nil
}

func (s *SecretAdapter) Update(ctx context.Context, secret, new *corev1.Secret) (*corev1.Secret, error) {
updateSecret := new
utils.SetObjectGlobal(&updateSecret.ObjectMeta)
newSecret, err := s.client.CoreV1().Secrets(secret.Namespace).Update(ctx, updateSecret, metav1.UpdateOptions{})
if err != nil {
return secret, err
}
return newSecret, nil
}

func (s *SecretAdapter) Delete(ctx context.Context, namespace string, secretName string) error {
err := s.client.CoreV1().Secrets(namespace).Delete(ctx, secretName, metav1.DeleteOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return err
}
}
return nil
}

func (s *SecretAdapter) Create(ctx context.Context, secret *corev1.Secret) error {
err := s.createSecret(ctx, secret)
if err != nil {
return err
}
return nil
}

func (s *SecretAdapter) createSecret(ctx context.Context, secret *corev1.Secret) error {
utils.TrimObjectMeta(&secret.ObjectMeta)
utils.SetObjectGlobal(&secret.ObjectMeta)

_, err := s.client.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{})
if err != nil {
if !errors.IsAlreadyExists(err) {
return err
}
}
return nil
}
286 changes: 286 additions & 0 deletions pkg/clustertree/knode-manager/controllers/secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package controllers

import (
"context"
"fmt"
"reflect"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/adapters"
"github.com/kosmos.io/kosmos/pkg/clustertree/knode-manager/utils"
)

// SecretController is a controller sync secret from master cluster to client cluster
type SecretController struct {
handler adapters.SecretHandler

masterLister lister.SecretLister
masterSecretQueue workqueue.RateLimitingInterface
masterSynced cache.InformerSynced

clientLister lister.SecretLister
clientSecretQueue workqueue.RateLimitingInterface
clientSynced cache.InformerSynced
}

// NewSecretController returns a new *SecretController
func NewSecretController(adapter adapters.SecretHandler, mInformer, cInformer kubeinformers.SharedInformerFactory) (*SecretController, error) {
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Second, 30*time.Second)
secret := &SecretController{
handler: adapter,
masterSecretQueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "Knode master secret queue"),
clientSecretQueue: workqueue.NewNamedRateLimitingQueue(rateLimiter, "Knode client secret queue"),
}

masterInformer := mInformer.Core().V1().Secrets()
clientInformer := cInformer.Core().V1().Secrets()

// master
if _, err := masterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
UpdateFunc: secret.MasterUpdateSecret,
DeleteFunc: secret.MasterDeleteSecret,
}, 0); err != nil {
return nil, err
}
secret.masterLister = masterInformer.Lister()
secret.masterSynced = masterInformer.Informer().HasSynced

// client
if _, err := clientInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
UpdateFunc: secret.ClientUpdateSecret,
DeleteFunc: secret.ClientDeleteSecret,
}, 0); err != nil {
return nil, err
}
secret.clientLister = clientInformer.Lister()
secret.clientSynced = clientInformer.Informer().HasSynced

return secret, nil
}

func (s *SecretController) MasterUpdateSecret(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
runtime.HandleError(err)
return
}
s.masterSecretQueue.Add(key)
klog.Infof("enqueue master secret update key: %v", key)
}

func (s *SecretController) MasterDeleteSecret(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
s.masterSecretQueue.Add(key)
klog.Infof("enqueue master secret delete key: %v", key)
}

func (s *SecretController) ClientUpdateSecret(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
runtime.HandleError(err)
return
}
s.clientSecretQueue.Add(key)
klog.Infof("enqueue client secret update key: %v", key)
}

func (s *SecretController) ClientDeleteSecret(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
s.clientSecretQueue.Add(key)
klog.Infof("enqueue client secret delete key: %v", key)
}

// Run starts and listens on channel events
func (s *SecretController) Run(ctx context.Context) error {
defer s.masterSecretQueue.ShutDown()
defer s.clientSecretQueue.ShutDown()

klog.Info("kosmos starting secret controller")
defer klog.Info("kosmos shutting secret controller")

stopCh := ctx.Done()
workers := 1

if !cache.WaitForCacheSync(stopCh, s.masterSynced) {
return fmt.Errorf("kosmos cannot sync master secret cache")
}
klog.Info("kosmos sync master secret cache success")

if !cache.WaitForCacheSync(stopCh, s.clientSynced) {
return fmt.Errorf("kosmos cannot sync client secret cache")
}
klog.Info("kosmos sync client secret cache success")

for i := 0; i < workers; i++ {
go wait.Until(s.syncClientSecret, 0, stopCh)
go wait.Until(s.resyncClientSecret, 0, stopCh)
}

<-stopCh
return nil
}

// sync master secret -> client secret
func (s *SecretController) syncClientSecret() {
secret2key, result := s.masterSecretQueue.Get()
if result {
return
}
defer s.masterSecretQueue.Done(secret2key)
ns, secretName, err := cache.SplitMetaNamespaceKey(secret2key.(string))
if err != nil {
s.masterSecretQueue.Forget(secret2key.(string))
return
}

klog.Infof("kosmos sync secret: start sync secret, secret: %v", secret2key.(string))
secretLifecycle := false

secretInstance, err := s.masterLister.Secrets(ns).Get(secretName)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("kosmos sync: get secret from master list failed, error: %v", err)
return
}
secretLifecycle = true
}
defer func() {
if err != nil {
s.masterSecretQueue.AddRateLimited(secret2key)
return
}
s.masterSecretQueue.Forget(secret2key)
}()
if secretLifecycle || secretInstance.DeletionTimestamp != nil {
err = s.handler.Delete(context.TODO(), ns, secretName)
if err != nil {
klog.Errorf("kosmos sync: delete secret from client list failed, secret: %v", secretName)
return
}
klog.Infof("kosmos sync: delete secret from client list succeed, secret: %v", secretName)
return
}

klog.Infof("Secret %v/%v to be update", ns, secretName)
s.syncSecretHandler(secretInstance)
}

// resync master secret -> client secret (Prevents the client secret from being incorrectly modified)
func (s *SecretController) resyncClientSecret() {
secret2key, result := s.clientSecretQueue.Get()
if result {
return
}
defer s.clientSecretQueue.Done(secret2key)
ns, secretName, err := cache.SplitMetaNamespaceKey(secret2key.(string))
if err != nil {
s.clientSecretQueue.Forget(secret2key.(string))
return
}

var masterSecret *corev1.Secret
var clientSecret *corev1.Secret

clientSecret, err = s.clientLister.Secrets(ns).Get(secretName)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("kosmos sync: get secret from client list failed, error: %v", err)
return
}
masterSecretLifecycle := true
masterSecret, err = s.masterLister.Secrets(ns).Get(secretName)
if err != nil {
if !apierrors.IsNotFound(err) {
klog.Errorf("kosmos sync: get secret from master list failed, error: %v", err)
return
}
masterSecretLifecycle = false
}
// The client deletes its own secret. We need to synchronize the master's secret to the client
if masterSecretLifecycle && masterSecret.DeletionTimestamp == nil {
klog.Infof("kosmos sync: client secret %v/%v is deleted in error", masterSecret.Namespace, masterSecret.Name)
err := s.handler.Create(context.TODO(), masterSecret)
if err != nil {
klog.Errorf("kosmos sync: create client secret failed, error: %v", err)
return
}
klog.Infof("kosmos sync: client secret %v/%v created successfully", masterSecret.Namespace, masterSecret.Name)
}
return
}

defer func() {
if err != nil {
s.clientSecretQueue.AddRateLimited(secret2key)
return
}
s.clientSecretQueue.Forget(secret2key)
}()

masterSecret, err = s.masterLister.Secrets(ns).Get(secretName)
if err != nil {
klog.Errorf("kosmos sync: get secret from master list failed, error: %v", err)
return
}
if reflect.DeepEqual(clientSecret.Data, masterSecret.Data) &&
reflect.DeepEqual(clientSecret.StringData, masterSecret.StringData) {
return
}
// The client updates its own secret. We need to synchronize the master's secret to the client
klog.Infof("Client secret %v/%v is updated in error", ns, secretName)
secretCopy := masterSecret.DeepCopy()
utils.TrimObjectMeta(&secretCopy.ObjectMeta)
if _, err = s.handler.Update(context.TODO(), clientSecret, secretCopy); err != nil {
klog.Infof("Update client secret %v/%v err: %v", clientSecret.Namespace, clientSecret.Name, err)
return
}
klog.Infof("Client secret %v/%v updated successfully", ns, secretName)
}

func (s *SecretController) syncSecretHandler(secret *corev1.Secret) {
key, err := cache.MetaNamespaceKeyFunc(secret)
if err != nil {
runtime.HandleError(err)
return
}
defer func() {
if err != nil {
klog.Error(err)
s.masterSecretQueue.AddRateLimited(key)
return
}
}()
var secretInSub *corev1.Secret
secretInSub, err = s.clientLister.Secrets(secret.Namespace).Get(secret.Name)
if err != nil {
klog.Infof("secretInSub %v err: %v", secretInSub, err)
return
}

secretCopy := secret.DeepCopy()
utils.TrimObjectMeta(&secretCopy.ObjectMeta)
klog.Infof("Old secret %+v\n, new secret %+v", secretInSub, secretCopy)
if _, err = s.handler.Update(context.TODO(), secretInSub, secretCopy); err != nil {
klog.Infof("Update secret %v err: %v", secretInSub, err)
return
}
klog.Infof("Handler secret: finished processing %q", secret.Name)
}

0 comments on commit 27ab48e

Please sign in to comment.