Skip to content

Commit

Permalink
Merge branch 'rxymain' into rxymain1
Browse files Browse the repository at this point in the history
# Conflicts:
#	cmd/knode-manager/app/config/config.go
#	cmd/knode-manager/app/manager.go
#	cmd/knode-manager/app/options/options.go
#	pkg/knode-manager/knodemanager.go
#	pkg/knode-manager/result.go
#	pkg/kosmosctl/floater/doctor.go
#	pkg/kosmosctl/install/install.go
#	pkg/kosmosctl/kosmosctl.go
#	pkg/kosmosctl/manifest/manifest_clusterrolebindings.go
#	pkg/kosmosctl/manifest/manifest_clusterroles.go
#	pkg/kosmosctl/manifest/manifest_deployments.go
#	pkg/kosmosctl/manifest/manifest_serviceaccounts.go
#	pkg/kosmosctl/uninstall/uninstall.go
#	pkg/kosmosctl/util/builder.go
#	pkg/kosmosctl/util/check.go
  • Loading branch information
renxiangyu committed Oct 23, 2023
2 parents 27ab48e + 368c032 commit f4b06c2
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 0 deletions.
31 changes: 31 additions & 0 deletions pkg/knode-manager/adapters/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package adapters

import (
"context"

corev1 "k8s.io/api/core/v1"
)

type PodHandler interface {
Create(ctx context.Context, pod *corev1.Pod) error

Update(ctx context.Context, pod *corev1.Pod) error

Delete(ctx context.Context, pod *corev1.Pod) error

Get(ctx context.Context, namespace, name string) (*corev1.Pod, error)

GetStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error)

List(context.Context) ([]*corev1.Pod, error)

Notify(context.Context, func(*corev1.Pod))
}

type NodeHandler interface {
Trace(context.Context) error

NotifyStatus(ctx context.Context, cb func(*corev1.Node))

Configure(context.Context, *corev1.Node)
}
26 changes: 26 additions & 0 deletions pkg/knode-manager/adapters/k8s/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package k8sadapter

import (
"context"

corev1 "k8s.io/api/core/v1"

"github.com/kosmos.io/kosmos/pkg/network"
)

type NodeAdapter struct {
}

func NewNodeAdapter() (*NodeAdapter, error) {
return nil, network.ErrNotImplemented
}

func (n *NodeAdapter) Configure(ctx context.Context, node *corev1.Node) {
}

func (n *NodeAdapter) Trace(_ context.Context) error {
return network.ErrNotImplemented
}

func (n *NodeAdapter) NotifyStatus(_ context.Context, _ func(*corev1.Node)) {
}
44 changes: 44 additions & 0 deletions pkg/knode-manager/adapters/k8s/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package k8sadapter

import (
"context"

corev1 "k8s.io/api/core/v1"

"github.com/kosmos.io/kosmos/pkg/network"
)

type PodAdapter struct {
}

func NewPodAdapter() (*PodAdapter, error) {
return nil, network.ErrNotImplemented
}

func (p *PodAdapter) Create(ctx context.Context, pod *corev1.Pod) error {
return network.ErrNotImplemented
}

func (p *PodAdapter) Update(ctx context.Context, pod *corev1.Pod) error {
return network.ErrNotImplemented
}

func (p *PodAdapter) Delete(ctx context.Context, pod *corev1.Pod) error {
return network.ErrNotImplemented
}

func (p *PodAdapter) Get(ctx context.Context, namespace string, name string) (*corev1.Pod, error) {
return nil, network.ErrNotImplemented
}

func (p *PodAdapter) GetStatus(ctx context.Context, namespace string, name string) (*corev1.PodStatus, error) {
return nil, network.ErrNotImplemented
}

func (p *PodAdapter) List(_ context.Context) ([]*corev1.Pod, error) {
return nil, network.ErrNotImplemented
}

func (p *PodAdapter) Notify(ctx context.Context, f func(*corev1.Pod)) {

}
32 changes: 32 additions & 0 deletions pkg/knode-manager/controllers/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package controllers

import (
"k8s.io/client-go/kubernetes"

"github.com/kosmos.io/kosmos/pkg/knode-manager/adapters"
)

type NodeController struct {
adapter adapters.NodeHandler
client kubernetes.Interface
}

func NewNodeController(adapter adapters.NodeHandler, client kubernetes.Interface) (*NodeController, error) {
return &NodeController{
adapter: adapter,
client: client,
}, nil
}

func (n *NodeController) applyNode() error {
return nil
}

func (n *NodeController) Run() error {
err := n.applyNode()
if err != nil {
return err
}

return nil
}
103 changes: 103 additions & 0 deletions pkg/knode-manager/controllers/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package controllers

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"github.com/kosmos.io/kosmos/pkg/knode-manager/adapters"
)

type PodConfig struct {
PodClient corev1client.PodsGetter

PodInformer corev1informers.PodInformer

EventRecorder record.EventRecorder

PodHandler adapters.PodHandler

ConfigMapInformer corev1informers.ConfigMapInformer
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
}

type PodController struct {
podHandler adapters.PodHandler

podsInformer corev1informers.PodInformer

podsLister corev1listers.PodLister

// nolint:unused
recorder record.EventRecorder

client corev1client.PodsGetter
}

func NewPodController(cfg PodConfig) (*PodController, error) {
if cfg.PodClient == nil {
return nil, fmt.Errorf("missing core client")
}
if cfg.EventRecorder == nil {
return nil, fmt.Errorf("missing event recorder")
}
if cfg.PodInformer == nil {
return nil, fmt.Errorf("missing pod informer")
}
if cfg.ConfigMapInformer == nil {
return nil, fmt.Errorf("missing config map informer")
}
if cfg.SecretInformer == nil {
return nil, fmt.Errorf("missing secret informer")
}
if cfg.ServiceInformer == nil {
return nil, fmt.Errorf("missing service informer")
}
if cfg.PodHandler == nil {
return nil, fmt.Errorf("missing podHandler")
}

pc := &PodController{
client: cfg.PodClient,
podsInformer: cfg.PodInformer,
podsLister: cfg.PodInformer.Lister(),
podHandler: cfg.PodHandler,
}

return pc, nil
}

func (pd *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{
AddFunc: func(pod interface{}) {
// nolint:errcheck
pd.podHandler.Create(ctx, &corev1.Pod{})
},
UpdateFunc: func(oldObj, newObj interface{}) {
// nolint:errcheck
pd.podHandler.Update(ctx, &corev1.Pod{})
},
DeleteFunc: func(pod interface{}) {
// nolint:errcheck
pd.podHandler.Delete(ctx, &corev1.Pod{})
},
}

if _, err := pd.podsInformer.Informer().AddEventHandler(eventHandler); err != nil {
return err
}

<-ctx.Done()

return nil
}
72 changes: 72 additions & 0 deletions pkg/knode-manager/knode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package knodemanager

import (
"context"
"fmt"

"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
klogv2 "k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/cmd/knode-manager/app/config"
kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/knode-manager/adapters"
k8sadapter "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters/k8s"
"github.com/kosmos.io/kosmos/pkg/knode-manager/controllers"
)

type Knode struct {
podController *controllers.PodController
nodeController *controllers.NodeController
}

func NewKnode(_ context.Context, knode *kosmosv1alpha1.Knode, c *config.Opts) (*Knode, error) {
kubeconfig, err := clientcmd.BuildConfigFromFlags("", c.KubeConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to build master kubeconfig: %v", err)
}
kubeconfig.QPS, kubeconfig.Burst = c.KubeAPIQPS, c.KubeAPIBurst
mClient, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to new master clientset: %v", err)
}

var podAdapter adapters.PodHandler
var nodeAdapter adapters.NodeHandler
if knode.Spec.Type == kosmosv1alpha1.K8sAdapter {
podAdapter, err = k8sadapter.NewPodAdapter()
if err != nil {
return nil, err
}
nodeAdapter, err = k8sadapter.NewNodeAdapter()
if err != nil {
return nil, err
}
}

pc, err := controllers.NewPodController(controllers.PodConfig{
PodHandler: podAdapter,
})
if err != nil {
return nil, err
}

nc, err := controllers.NewNodeController(nodeAdapter, mClient)
if err != nil {
return nil, err
}

return &Knode{
podController: pc,
nodeController: nc,
}, nil
}

func (kn *Knode) Run(ctx context.Context, c *config.Opts) {
go func() {
if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled {
klogv2.Fatal(err)
}
}()
}
1 change: 1 addition & 0 deletions pkg/knode-manager/utils/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package utils

0 comments on commit f4b06c2

Please sign in to comment.