Skip to content

Commit

Permalink
draft: support serverless
Browse files Browse the repository at this point in the history
Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Dec 5, 2023
1 parent 6342771 commit b8ec042
Show file tree
Hide file tree
Showing 9 changed files with 685 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/clustertree/cluster-manager/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, nil
}

// TODO: @byh k8s or openApi
// if openApi launch special node and pod controller
// else ...

// build mgr for cluster
// TODO bug, the v4 log is lost
mgr, err := controllerruntime.NewManager(config, controllerruntime.Options{
Expand Down
38 changes: 38 additions & 0 deletions pkg/clustertree/cluster-manager/resource-manager/k8s/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package k8s

import (
"context"
"fmt"

// resourcemanager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/resource-manager"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PodResourceHandler struct {
// resourcemanager.ResourceHandler
A string
}

func (p *PodResourceHandler) Create(ctx context.Context, obj client.Object, opts any) error {
o := opts.(client.CreateOption)
fmt.Println("hhhhh", o)
return nil
}

func (p *PodResourceHandler) Delete(ctx context.Context, obj client.Object, opts any) error {
o := opts.(string)
fmt.Println("hhhhh", p.A, o)
return nil
}

func (p *PodResourceHandler) Update(ctx context.Context, obj client.Object, opts any) error {
return nil
}

func (p *PodResourceHandler) Get(ctx context.Context, obj client.Object, opts any) error {
return nil
}

func (p *PodResourceHandler) List(ctx context.Context, objs client.ObjectList, opts any) error {
return nil
}
23 changes: 23 additions & 0 deletions pkg/clustertree/cluster-manager/resource-manager/leaf_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package resourcemanager

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"
)

type ResourceHandler interface {
Create(ctx context.Context, obj client.Object, opts any) error
Delete(ctx context.Context, obj client.Object, opts any) error
Update(ctx context.Context, obj client.Object, opts any) error
Get(ctx context.Context, obj client.Object, opts any) error
List(ctx context.Context, objs client.ObjectList, opts any) error
}

type PodResourceHandler interface {
ResourceHandler
}

type NodeResourceHandler interface {
ResourceHandler
}
37 changes: 37 additions & 0 deletions pkg/clustertree/cluster-manager/resource-manager/open-api/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package openapi

import (
"context"
"fmt"

// resourcemanager "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/resource-manager"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PodResourceHandler struct {
// resourcemanager.ResourceHandler
}

func (p *PodResourceHandler) Create(ctx context.Context, obj client.Object, opts any) error {
o := opts.(client.CreateOption)
fmt.Println("openapi", o)
return nil
}

func (p *PodResourceHandler) Delete(ctx context.Context, obj client.Object, opts any) error {
o := opts.(string)
fmt.Println("openapi", o)
return nil
}

func (p *PodResourceHandler) Update(ctx context.Context, obj client.Object, opts any) error {
return nil
}

func (p *PodResourceHandler) Get(ctx context.Context, obj client.Object, opts any) error {
return nil
}

func (p *PodResourceHandler) List(ctx context.Context, objs client.ObjectList, opts any) error {
return nil
}
88 changes: 88 additions & 0 deletions pkg/runtime/bizqueue/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package bizqueue

import (
"flag"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/kosmos.io/kosmos/pkg/runtime"
)

type rootK8sK8sSyncer struct {
}

func (r *rootK8sK8sSyncer) Reconcile(key string) error {
fmt.Printf("rootK8sK8sSyncer::: key %s \n", key)
return nil
}

func NewRootK8sK8sSyncer() runtime.Reconciler {
return &rootK8sK8sSyncer{}
}

func TestK8sController() {
var kubeconfig string
var master string

flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
flag.StringVar(&master, "master", "", "master url")
flag.Parse()

// creates the connection
config, err := clientcmd.BuildConfigFromFlags(master, kubeconfig)
if err != nil {
klog.Fatal(err)
}

// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}

// create the pod watcher
// podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", "byh", fields.Everything())

// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
_, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})

c := runtime.NewK8sController(queue, informer, NewRootK8sK8sSyncer())

stop := make(chan struct{})
c.Run(1, stop)
}
41 changes: 41 additions & 0 deletions pkg/runtime/bizqueue/open-api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package bizqueue

import (
"fmt"
"time"

"k8s.io/client-go/util/workqueue"

"github.com/kosmos.io/kosmos/pkg/runtime"
)

type rootK8sOpenApiSyncer struct {
}

func (r *rootK8sOpenApiSyncer) Reconcile(key string) error {
fmt.Printf("rootK8sOpenApiSyncer::: key %s \n", key)
return nil
}

func NewRootK8sOpenApiSyncer() runtime.Reconciler {
return &rootK8sOpenApiSyncer{}
}

func TestOpenApiController() {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

c := runtime.NewOpenApiController(queue, NewRootK8sOpenApiSyncer())

stop := make(chan struct{})

go func() {
time.Sleep(3 * time.Second)
queue.Add("hello~")
time.Sleep(3 * time.Second)
queue.Add("world~")
time.Sleep(3 * time.Second)
stop <- struct{}{}
}()

c.Run(1, stop)
}
67 changes: 67 additions & 0 deletions pkg/runtime/bizqueue/root-leaf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package bizqueue

import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"

kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/runtime"
leafUtils "github.com/kosmos.io/kosmos/pkg/runtime/utils"
)

const (
K8s = "K8s"
OpenApi = "OpenApi"
)

type rootK8sToLeafSyncer struct {
}

func (r *rootK8sToLeafSyncer) Reconcile(key string) error {
fmt.Printf("rootK8sToLeafSyncer::: key %s \n", key)
// key -> node -> cluster
globalleafManager := leafUtils.GetGlobalLeafResourceManager()

lr, err := globalleafManager.GetLeafResource(key)
if err != nil {
// fmt.Errorf("err: %s", err)
return err
}

return lr.Reconcile(key)
}

func NewRootK8sToLeafSyncer() runtime.Reconciler {
return &rootK8sToLeafSyncer{}
}

func TestRootToLeaf() {
globalleafManager := leafUtils.GetGlobalLeafResourceManager()

clusterK8s := &kosmosv1alpha1.Cluster{}
clusterK8s.Name = K8s
clusterOpenApi := &kosmosv1alpha1.Cluster{}
clusterOpenApi.Name = OpenApi
globalleafManager.AddLeafResource(&leafUtils.K8sLeafResource{}, clusterK8s, []*corev1.Node{})
globalleafManager.AddLeafResource(&leafUtils.OpenApiLeafResource{}, clusterOpenApi, []*corev1.Node{})

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

c := runtime.NewOpenApiController(queue, NewRootK8sToLeafSyncer())

stop := make(chan struct{})

go func() {
time.Sleep(3 * time.Second)
queue.Add(K8s)
time.Sleep(3 * time.Second)
queue.Add(OpenApi)
time.Sleep(3 * time.Second)
stop <- struct{}{}
}()

c.Run(1, stop)
}
Loading

0 comments on commit b8ec042

Please sign in to comment.