diff --git a/pkg/knode-manager/adapters/interface.go b/pkg/knode-manager/adapters/interface.go new file mode 100644 index 000000000..44812398d --- /dev/null +++ b/pkg/knode-manager/adapters/interface.go @@ -0,0 +1,31 @@ +package adapters + +import ( + "context" + + corev1 "k8s.io/api/core/v1" +) + +type PodHandler interface { + Create(ctx context.Context, pod *corev1.Pod) error + + Update(ctx context.Context, pod *corev1.Pod) error + + Delete(ctx context.Context, pod *corev1.Pod) error + + Get(ctx context.Context, namespace, name string) (*corev1.Pod, error) + + GetStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) + + List(context.Context) ([]*corev1.Pod, error) + + Notify(context.Context, func(*corev1.Pod)) +} + +type NodeHandler interface { + Trace(context.Context) error + + NotifyStatus(ctx context.Context, cb func(*corev1.Node)) + + Configure(context.Context, *corev1.Node) +} diff --git a/pkg/knode-manager/adapters/k8s/node.go b/pkg/knode-manager/adapters/k8s/node.go new file mode 100644 index 000000000..e6f14272b --- /dev/null +++ b/pkg/knode-manager/adapters/k8s/node.go @@ -0,0 +1,26 @@ +package k8sadapter + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + + "github.com/kosmos.io/kosmos/pkg/network" +) + +type NodeAdapter struct { +} + +func NewNodeAdapter() (*NodeAdapter, error) { + return nil, network.ErrNotImplemented +} + +func (n *NodeAdapter) Configure(ctx context.Context, node *corev1.Node) { +} + +func (n *NodeAdapter) Trace(_ context.Context) error { + return network.ErrNotImplemented +} + +func (n *NodeAdapter) NotifyStatus(_ context.Context, _ func(*corev1.Node)) { +} diff --git a/pkg/knode-manager/adapters/k8s/pod.go b/pkg/knode-manager/adapters/k8s/pod.go new file mode 100644 index 000000000..509dbcfe5 --- /dev/null +++ b/pkg/knode-manager/adapters/k8s/pod.go @@ -0,0 +1,44 @@ +package k8sadapter + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + + "github.com/kosmos.io/kosmos/pkg/network" +) + +type PodAdapter struct { +} + +func NewPodAdapter() (*PodAdapter, error) { + return nil, network.ErrNotImplemented +} + +func (p *PodAdapter) Create(ctx context.Context, pod *corev1.Pod) error { + return network.ErrNotImplemented +} + +func (p *PodAdapter) Update(ctx context.Context, pod *corev1.Pod) error { + return network.ErrNotImplemented +} + +func (p *PodAdapter) Delete(ctx context.Context, pod *corev1.Pod) error { + return network.ErrNotImplemented +} + +func (p *PodAdapter) Get(ctx context.Context, namespace string, name string) (*corev1.Pod, error) { + return nil, network.ErrNotImplemented +} + +func (p *PodAdapter) GetStatus(ctx context.Context, namespace string, name string) (*corev1.PodStatus, error) { + return nil, network.ErrNotImplemented +} + +func (p *PodAdapter) List(_ context.Context) ([]*corev1.Pod, error) { + return nil, network.ErrNotImplemented +} + +func (p *PodAdapter) Notify(ctx context.Context, f func(*corev1.Pod)) { + +} diff --git a/pkg/knode-manager/controllers/node.go b/pkg/knode-manager/controllers/node.go new file mode 100644 index 000000000..ce3821c7d --- /dev/null +++ b/pkg/knode-manager/controllers/node.go @@ -0,0 +1,32 @@ +package controllers + +import ( + "k8s.io/client-go/kubernetes" + + "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters" +) + +type NodeController struct { + adapter adapters.NodeHandler + client kubernetes.Interface +} + +func NewNodeController(adapter adapters.NodeHandler, client kubernetes.Interface) (*NodeController, error) { + return &NodeController{ + adapter: adapter, + client: client, + }, nil +} + +func (n *NodeController) applyNode() error { + return nil +} + +func (n *NodeController) Run() error { + err := n.applyNode() + if err != nil { + return err + } + + return nil +} diff --git a/pkg/knode-manager/controllers/pod.go b/pkg/knode-manager/controllers/pod.go new file mode 100644 index 000000000..00976bd67 --- /dev/null +++ b/pkg/knode-manager/controllers/pod.go @@ -0,0 +1,103 @@ +package controllers + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters" +) + +type PodConfig struct { + PodClient corev1client.PodsGetter + + PodInformer corev1informers.PodInformer + + EventRecorder record.EventRecorder + + PodHandler adapters.PodHandler + + ConfigMapInformer corev1informers.ConfigMapInformer + SecretInformer corev1informers.SecretInformer + ServiceInformer corev1informers.ServiceInformer +} + +type PodController struct { + podHandler adapters.PodHandler + + podsInformer corev1informers.PodInformer + + podsLister corev1listers.PodLister + + // nolint:unused + recorder record.EventRecorder + + client corev1client.PodsGetter +} + +func NewPodController(cfg PodConfig) (*PodController, error) { + if cfg.PodClient == nil { + return nil, fmt.Errorf("missing core client") + } + if cfg.EventRecorder == nil { + return nil, fmt.Errorf("missing event recorder") + } + if cfg.PodInformer == nil { + return nil, fmt.Errorf("missing pod informer") + } + if cfg.ConfigMapInformer == nil { + return nil, fmt.Errorf("missing config map informer") + } + if cfg.SecretInformer == nil { + return nil, fmt.Errorf("missing secret informer") + } + if cfg.ServiceInformer == nil { + return nil, fmt.Errorf("missing service informer") + } + if cfg.PodHandler == nil { + return nil, fmt.Errorf("missing podHandler") + } + + pc := &PodController{ + client: cfg.PodClient, + podsInformer: cfg.PodInformer, + podsLister: cfg.PodInformer.Lister(), + podHandler: cfg.PodHandler, + } + + return pc, nil +} + +func (pd *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var eventHandler cache.ResourceEventHandler = cache.ResourceEventHandlerFuncs{ + AddFunc: func(pod interface{}) { + // nolint:errcheck + pd.podHandler.Create(ctx, &corev1.Pod{}) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + // nolint:errcheck + pd.podHandler.Update(ctx, &corev1.Pod{}) + }, + DeleteFunc: func(pod interface{}) { + // nolint:errcheck + pd.podHandler.Delete(ctx, &corev1.Pod{}) + }, + } + + if _, err := pd.podsInformer.Informer().AddEventHandler(eventHandler); err != nil { + return err + } + + <-ctx.Done() + + return nil +} diff --git a/pkg/knode-manager/knode.go b/pkg/knode-manager/knode.go new file mode 100644 index 000000000..af69cfbec --- /dev/null +++ b/pkg/knode-manager/knode.go @@ -0,0 +1,72 @@ +package knodemanager + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + klogv2 "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/cmd/knode-manager/app/config" + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters" + k8sadapter "github.com/kosmos.io/kosmos/pkg/knode-manager/adapters/k8s" + "github.com/kosmos.io/kosmos/pkg/knode-manager/controllers" +) + +type Knode struct { + podController *controllers.PodController + nodeController *controllers.NodeController +} + +func NewKnode(_ context.Context, knode *kosmosv1alpha1.Knode, c *config.Opts) (*Knode, error) { + kubeconfig, err := clientcmd.BuildConfigFromFlags("", c.KubeConfigPath) + if err != nil { + return nil, fmt.Errorf("failed to build master kubeconfig: %v", err) + } + kubeconfig.QPS, kubeconfig.Burst = c.KubeAPIQPS, c.KubeAPIBurst + mClient, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to new master clientset: %v", err) + } + + var podAdapter adapters.PodHandler + var nodeAdapter adapters.NodeHandler + if knode.Spec.Type == kosmosv1alpha1.K8sAdapter { + podAdapter, err = k8sadapter.NewPodAdapter() + if err != nil { + return nil, err + } + nodeAdapter, err = k8sadapter.NewNodeAdapter() + if err != nil { + return nil, err + } + } + + pc, err := controllers.NewPodController(controllers.PodConfig{ + PodHandler: podAdapter, + }) + if err != nil { + return nil, err + } + + nc, err := controllers.NewNodeController(nodeAdapter, mClient) + if err != nil { + return nil, err + } + + return &Knode{ + podController: pc, + nodeController: nc, + }, nil +} + +func (kn *Knode) Run(ctx context.Context, c *config.Opts) { + go func() { + if err := kn.podController.Run(ctx, c.PodSyncWorkers); err != nil && errors.Cause(err) != context.Canceled { + klogv2.Fatal(err) + } + }() +} diff --git a/pkg/knode-manager/utils/util.go b/pkg/knode-manager/utils/util.go new file mode 100644 index 000000000..d4b585bf7 --- /dev/null +++ b/pkg/knode-manager/utils/util.go @@ -0,0 +1 @@ +package utils