Skip to content

Commit

Permalink
draft: support serverless leaf cluster
Browse files Browse the repository at this point in the history
Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Dec 14, 2023
1 parent b7a02b7 commit 4dbe546
Show file tree
Hide file tree
Showing 37 changed files with 3,833 additions and 563 deletions.
15 changes: 7 additions & 8 deletions cmd/clustertree/cluster-manager/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,15 @@ func run(ctx context.Context, opts *options.Options) error {
}
}

// init rootPodController
rootPodReconciler := podcontrollers.RootPodReconciler{
GlobalLeafManager: globalleafManager,
RootClient: mgr.GetClient(),
rootPodWorkerQueue := podcontrollers.NewRootPodWorkerQueue(&podcontrollers.RootPodWorkerQueueOption{
Config: config,
RootClient: rootClient,
DynamicRootClient: dynamicClient,
GlobalLeafManager: globalleafManager,
Options: opts,
}
if err := rootPodReconciler.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting rootPodReconciler %s: %v", podcontrollers.RootPodControllerName, err)
}
})

go rootPodWorkerQueue.Run(ctx)

if !opts.OnewayStorageControllers {
rootPVCController := pvc.RootPVCController{
Expand Down
8 changes: 8 additions & 0 deletions deploy/crds/kosmos.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ spec:
type: object
clusterTreeOptions:
properties:
accressKey:
description: secret?
type: string
enable:
default: true
type: boolean
Expand Down Expand Up @@ -223,6 +226,11 @@ spec:
type: array
type: object
type: array
leafType:
default: k8s
type: string
secretKey:
type: string
type: object
imageRepository:
type: string
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ require (
github.com/go-logr/logr v1.2.3
github.com/gogo/protobuf v1.3.2
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.1
github.com/mitchellh/mapstructure v1.5.0
github.com/olekukonko/tablewriter v0.0.4
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.4
Expand Down Expand Up @@ -97,7 +99,6 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,8 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A=
github.com/moby/ipvs v1.0.1/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ=
github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/kosmos/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ type ClusterTreeOptions struct {
// LeafModels provide an api to arrange the member cluster with some rules to pretend one or more leaf node
// +optional
LeafModels []LeafModel `json:"leafModels,omitempty"`

// +kubebuilder:default="k8s"
// +optional
LeafType string `json:"leafType,omitempty"`

// secret?
// +optional
AccessKey string `json:"accressKey,omitempty"`

// +optional
SecretKey string `json:"secretKey,omitempty"`
}

type LeafModel struct {
Expand Down
33 changes: 24 additions & 9 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/mcs"
podcontrollers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod"
leafpodsyncers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod/leaf-pod"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pv"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pvc"
leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils"
Expand Down Expand Up @@ -123,6 +124,19 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
return controllerruntime.Result{RequeueAfter: RequeueTime}, err
}

if cluster.Spec.ClusterTreeOptions.LeafType == string(leafUtils.LeafTypeServerless) {
if !cluster.DeletionTimestamp.IsZero() {
// TODO:
return reconcile.Result{}, nil
}
// TODO: ....
if err := CreateOpenApiNode(ctx, cluster, c.RootClientset, c.Options); err != nil {
return controllerruntime.Result{RequeueAfter: RequeueTime}, err
}
// TODO: clean
return reconcile.Result{}, nil
}

config, err := utils.NewConfigFromBytes(cluster.Spec.Kubeconfig, func(config *rest.Config) {
config.QPS = utils.DefaultLeafKubeQPS
config.Burst = utils.DefaultLeafKubeBurst
Expand Down Expand Up @@ -206,7 +220,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
c.ManagerCancelFuncs[cluster.Name] = &cancel
c.ControllerManagersLock.Unlock()

if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil {
if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config, subContext); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err)
}

Expand Down Expand Up @@ -243,7 +257,8 @@ func (c *ClusterController) setupControllers(
leafNodeSelector map[string]kosmosv1alpha1.NodeSelector,
leafClientset kubernetes.Interface,
kosmosClient kosmosversioned.Interface,
leafRestConfig *rest.Config) error {
leafRestConfig *rest.Config,
subContext context.Context) error {
c.GlobalLeafManager.AddLeafResource(&leafUtils.LeafResource{
Client: mgr.GetClient(),
DynamicClient: clientDynamic,
Expand All @@ -255,6 +270,8 @@ func (c *ClusterController) setupControllers(
IgnoreLabels: strings.Split("", ","),
EnableServiceAccount: true,
RestConfig: leafRestConfig,
// LeafType: leafUtils.LeafTypeK8s,
LeafType: leafUtils.LeafTypeK8s,
}, cluster, nodes)

nodeResourcesController := controllers.NodeResourcesController{
Expand Down Expand Up @@ -293,14 +310,12 @@ func (c *ClusterController) setupControllers(
}
}

leafPodController := podcontrollers.LeafPodReconciler{
RootClient: c.Root,
Namespace: "",
}
leafPodWorkerQueue := podcontrollers.NewLeafPodWorkerQueue(&leafpodsyncers.LeafPodWorkerQueueOption{
Config: leafRestConfig,
RootClient: c.RootClientset,
}, leafUtils.LeafTypeK8s) // TODO:

if err := leafPodController.SetupWithManager(mgr); err != nil {
return fmt.Errorf("error starting podUpstreamReconciler %s: %v", podcontrollers.LeafPodControllerName, err)
}
go leafPodWorkerQueue.Run(subContext)

if !c.Options.OnewayStorageControllers {
err := c.setupStorageControllers(mgr, utils.IsOne2OneMode(cluster), cluster.Name)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package k8s

import (
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/runtime"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

type leafPodK8sSyncer struct {
LeafClient *kubernetes.Clientset
RootClient kubernetes.Interface
}

const (
LeafPodControllerName = "leaf-pod-controller"
LeafPodRequeueTime = 10 * time.Second
)

func DeletePodInRootCluster(ctx context.Context, rootnamespacedname runtime.NamespacedName, rootClient kubernetes.Interface) error {
rPod, err := rootClient.CoreV1().Pods(rootnamespacedname.Namespace).Get(ctx, rootnamespacedname.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
} else {
return err
}
}

rPodCopy := rPod.DeepCopy()

if err := rootClient.CoreV1().Pods(rPodCopy.Namespace).Delete(ctx, rPodCopy.Name, metav1.DeleteOptions{
GracePeriodSeconds: new(int64),
}); err != nil {
if !errors.IsNotFound(err) {
return err
}
}

return nil
}

func (s *leafPodK8sSyncer) Reconcile(ctx context.Context, key runtime.NamespacedName) (runtime.Result, error) {
pod, err := s.LeafClient.CoreV1().Pods(key.Namespace).Get(ctx, key.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
// delete pod in root
if err := DeletePodInRootCluster(ctx, key, s.RootClient); err != nil {
return runtime.Result{RequeueAfter: LeafPodRequeueTime}, nil
}
return runtime.Result{}, nil
}

klog.Errorf("get %s error: %v", key, err)
return runtime.Result{RequeueAfter: LeafPodRequeueTime}, nil
}

podCopy := pod.DeepCopy()

// if ShouldSkipStatusUpdate(podCopy) {
// return reconcile.Result{}, nil
// }

if podutils.IsKosmosPod(podCopy) {
podutils.FitObjectMeta(&podCopy.ObjectMeta)
podCopy.ResourceVersion = "0"
if _, err := s.RootClient.CoreV1().Pods(podCopy.Namespace).UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil && !errors.IsNotFound(err) {
klog.V(4).Info(fmt.Sprintf("error while updating pod status in kubernetes: %s", err))
return runtime.Result{RequeueAfter: LeafPodRequeueTime}, nil
}
}
return runtime.Result{}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package k8s

import (
"time"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

leafpodsyncers "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/controllers/pod/leaf-pod"
"github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/runtime"
"github.com/kosmos.io/kosmos/pkg/utils"
"github.com/kosmos.io/kosmos/pkg/utils/podutils"
)

func NewLeafPodK8wWorkerQueue(opts *leafpodsyncers.LeafPodWorkerQueueOption) runtime.Controller {
// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

client, err := kubernetes.NewForConfig(opts.Config)
if err != nil {
klog.Fatal(err)
}

// Create a shared informer factory for Kubernetes pods in the current namespace (if specified) and scheduled to the current node.
podInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(
client,
5*time.Second,
)

podInformer := podInformerFactory.Core().V1().Pods()

eventFilter := func(obj interface{}) (bool, *corev1.Pod) {
p, ok := obj.(*corev1.Pod)

if !ok {
klog.Fatal("convert pod error")
return false, p
}

if len(p.Spec.NodeName) == 0 {
return false, p
}

if p.GetNamespace() == utils.ReservedNS {
return false, p
}

return podutils.IsKosmosPod(p), p
}

_, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if flag, pod := eventFilter(obj); flag {
queue.Add(runtime.NamespacedName{
Name: pod.Name,
Namespace: pod.Namespace,
})
}
},
UpdateFunc: func(old interface{}, new interface{}) {
if flag, pod := eventFilter(old); flag {
if !cmp.Equal(old.(*corev1.Pod).Status, new.(*corev1.Pod).Status) {
queue.Add(runtime.NamespacedName{
Name: pod.Name,
Namespace: pod.Namespace,
})
}
}
},
DeleteFunc: func(obj interface{}) {
if flag, pod := eventFilter(obj); flag {
queue.Add(runtime.NamespacedName{
Name: pod.Name,
Namespace: pod.Namespace,
})
}
},
})

if err != nil {
klog.Fatalf("add event handler error: %s", err)
panic(err)
}

leafClient, err := kubernetes.NewForConfig(opts.Config)
if err != nil {
klog.Fatalf("could not build clientset for cluster %s", err)
panic(err)
}

leafK8sSyncer := &leafPodK8sSyncer{
LeafClient: leafClient,
RootClient: opts.RootClient,
}

return runtime.NewK8sWorkerQueue(queue, podInformer.Informer(), leafK8sSyncer)
}
Loading

0 comments on commit 4dbe546

Please sign in to comment.