diff --git a/Chapter2/README.md b/Chapter2/README.md new file mode 100644 index 0000000..80a95ef --- /dev/null +++ b/Chapter2/README.md @@ -0,0 +1,147 @@ +# Schedule a pod and notice the cluster +## List and filter resource we are interesting in +Now we can communicate with the cluster by the help of client SDK. Let us move on, to complete our scheduler. For the simplest case, the pod and node are the minimum information we need to schedule. Let us add some code to query them from the cluster. However, other than the node, not all the pod we are interested in, right? We can use a filter to filter out these that need to schedule and these need schedule by our scheduler. Thanks to client SDK again, they had already provided such interface. +``` +... +// interestedPod selects pods that are assigned (scheduled and running). +func interestedPod(pod *v1.Pod) bool { + return pod.Spec.SchedulerName == SchedulerName && len(pod.Spec.NodeName) == 0 +} +... + informerfactory.Core().V1().Pods().Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return !interestedPod(t) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return !interestedPod(pod) + } + fmt.Errorf("unable to convert object %T to *v1.Pod", obj) + return false + default: + fmt.Errorf("unable to handle object in %T", obj) + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: addPod, + }, + }, + ) + + informerfactory.Core().V1().Nodes().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: addNode, + }, + ) +``` +Question: + +Could you list resource as possible as you can that a scheduler may interest in? + +Extending reading: addAllEventHandlers is the function that kube-scheduler adds events to watch on resouces. The source code is here: [The resource that kube scheduler list and watch](https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/eventhandlers.go) +``` +... +// addAllEventHandlers is a helper function used in tests and in Scheduler +// to add event handlers for various informers. +func addAllEventHandlers( + sched *Scheduler, + informerFactory informers.SharedInformerFactory, +) { + // scheduled pod cache + informerFactory.Core().V1().Pods().Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ +... +``` +Questions: +1. For a great number of pod and nodes, how should we store them, and which form of data struct to use? +[Scheduler cache](https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/internal/cache/interface.go) +2. Should we schedule the pod based on first in first out? Could you suggest a more reasonable way? +[Priority queue](https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/internal/queue/scheduling_queue.go) + +## Bind node and notice the cluster +Now we have enough information to schedule a pod. We should choose the node that the pod run on. This is the so-called filter and score procedure in the kube-scheduler. [filter and score](https://kubernetes.io/docs/concepts/scheduling-eviction/kube-scheduler/#kube-scheduler-implementation) +For our test environment there is only one node and we just want to see the schedule result so let us skip the filter and score. Move on to the next phase. We should tell the cluster our schedule result. This phase called binding and of course there is an api in client SDK to help you. +``` + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID}, + Target: v1.ObjectReference{Kind: "Node", Name: pod.Spec.NodeName}, + } + err := client.CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) +``` +Let us test our scheduler. Deploy a nginx which specify the schedule name to our scheduler. +``` +apiVersion: v1 +kind: Pod +metadata: + name: webserver +spec: + containers: + - name: webserver # The name that this container will have. + image: nginx:1.14.0 # The image on which it is based. + hostNetwork: true + schedulerName: "StupidScheduler" +``` +Deploy before start our scheduler. +``` +$ kubectl apply -f nginx_deployment.yaml +pod/webserver created +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +webserver 0/1 Pending 0 4s +``` +We can see the pod is in pending status. +Now start our scheduler +``` +# go run scheduler.go ~/scheduler.conf +add event for pod default/webserver +add event for node "test" +Success bind pod default/webserver +``` +We can see our pod is running now. +``` +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +webserver 1/1 Running 0 2m3s +``` +Test wherthe it works +``` +$ curl localhost:80 + + + +Welcome to nginx! + + + +

Welcome to nginx!

+

If you see this page, the nginx web server is successfully installed and +working. Further configuration is required.

+ +

For online documentation and support please refer to +nginx.org.
+Commercial support is available at +nginx.com.

+ +

Thank you for using nginx.

+ + +``` +If you see this. Congratulation! You just finish a your own kubernetes scheduler. +This basic tutorial just help you understand the kubernetes scheduler. Here we skip many problems occurs in the production environment. You can continue to read the advance topic or find the answer in the kube-scheduler. +More questions: +1. Can you split the schedule procedure to different phases? And point out each phase main task? +[Kube scheduler framework](https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/) +2. Can a single scheduler to schedule pod with different policy according to its configuration? +[Kube scheduler profile]() + + + diff --git a/Chapter2/TestNginx.yaml b/Chapter2/TestNginx.yaml new file mode 100644 index 0000000..3b01dad --- /dev/null +++ b/Chapter2/TestNginx.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: Pod +metadata: + name: webserver +spec: + containers: + - name: webserver # The name that this container will have. + image: nginx:1.14.0 # The image on which it is based. + hostNetwork: true + schedulerName: "StupidScheduler" \ No newline at end of file diff --git a/Chapter2/scheduler.go b/Chapter2/scheduler.go new file mode 100644 index 0000000..bf4cc1b --- /dev/null +++ b/Chapter2/scheduler.go @@ -0,0 +1,162 @@ +/* +Copyright 2021 Wuming Liu (lwmqwer@163.com) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package main + +import ( + "context" + "fmt" + "os" + "sync" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + schedulerName = "StupidScheduler" +) + +var ( + podslock sync.Mutex + nodeslock sync.Mutex + pods map[string]*v1.Pod + nodes map[string]*v1.Node +) + +func addPod(obj interface{}) { + pod := obj.(*v1.Pod) + fmt.Printf("add event for pod %s/%s\n", pod.Namespace, pod.Name) + podslock.Lock() + defer podslock.Unlock() + pods[pod.Namespace+pod.Name] = pod +} + +func addNode(obj interface{}) { + node := obj.(*v1.Node) + fmt.Printf("add event for node %q\n", node.Name) + nodeslock.Lock() + defer nodeslock.Unlock() + nodes[node.Name] = node +} + +// interestedPod selects pods that are assigned (scheduled and running). +func interestedPod(pod *v1.Pod) bool { + return pod.Spec.SchedulerName == schedulerName && len(pod.Spec.NodeName) == 0 +} + +func scheduleOne(ctx context.Context, client clientset.Interface) { + var pod *v1.Pod + + podslock.Lock() + for k, v := range pods { + pod = v + delete(pods, k) + break + } + podslock.Unlock() + if pod == nil { + return + } + nodeslock.Lock() + for _, v := range nodes { + pod.Spec.NodeName = v.Name + break + } + nodeslock.Unlock() + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID}, + Target: v1.ObjectReference{Kind: "Node", Name: pod.Spec.NodeName}, + } + err := client.CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) + if err != nil { + fmt.Printf("failed bind pod %s/%s\n", pod.Namespace, pod.Name) + pod.Spec.NodeName = "" + podslock.Lock() + defer podslock.Unlock() + pods[pod.Namespace+pod.Name] = pod + } + fmt.Printf("Success bind pod %s/%s\n", pod.Namespace, pod.Name) +} + +func main() { + pods = make(map[string]*v1.Pod) + nodes = make(map[string]*v1.Node) + // Prepare for informerfactory + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if len(os.Args) <= 1 || len(os.Args[1]) == 0 { + panic("No --kubeconfig was specified. Using default API client. This might not work") + } + + // This creates a client, load kubeconfig + kubeConfig, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: os.Args[1]}, nil).ClientConfig() + if err != nil { + os.Exit(-1) + } + client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeConfig, "scheduler")) + if err != nil { + os.Exit(-1) + } + informerfactory := informers.NewSharedInformerFactory(client, 0) + // Here we only care about pods and nodes. + informerfactory.Core().V1().Pods().Lister() + informerfactory.Core().V1().Nodes().Lister() + // Start all informers. + informerfactory.Start(ctx.Done()) + + // Wait for all caches to sync before scheduling. + informerfactory.WaitForCacheSync(ctx.Done()) + + informerfactory.Core().V1().Pods().Informer().AddEventHandler( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Pod: + return interestedPod(t) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return interestedPod(pod) + } + fmt.Errorf("unable to convert object %T to *v1.Pod", obj) + return false + default: + fmt.Errorf("unable to handle object in %T", obj) + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: addPod, + }, + }, + ) + + informerfactory.Core().V1().Nodes().Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: addNode, + }, + ) + + for { + scheduleOne(ctx, client) + } +} diff --git a/README.md b/README.md index ddd52cd..4b89fc0 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Before starting this tutorial, you still need some prerequisites for it: ## 1. [Communicate with the kubernetes cluster](Chapter1/README.md) +## 2. [Schedule a pod and notice the cluster](Chapter2/README.md) # Advance topics ## 1. [Metrics and logs]() ## 2. [Scheduler Cache and priority queue]()