Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[engine-1.21] Reduce node startup time #4362

Merged
merged 3 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions pkg/agent/flannel/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/util"
"github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/version"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/apimachinery/pkg/fields"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

const (
Expand Down Expand Up @@ -77,22 +79,10 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error {
return createFlannelConf(nodeConfig)
}

func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) error {
nodeName := nodeConfig.AgentConfig.NodeName

for {
node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{})
if err == nil && node.Spec.PodCIDR != "" {
break
}
if err == nil {
logrus.Info("Waiting for node " + nodeName + " CIDR not assigned yet")
} else {
logrus.Infof("Waiting for node %s: %v", nodeName, err)
}
time.Sleep(2 * time.Second)
func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInterface) error {
if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, nodes); err != nil {
return errors.Wrap(err, "failed to wait for PodCIDR assignment")
}
logrus.Info("Node CIDR assigned for: " + nodeName)

go func() {
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet)
Expand All @@ -102,6 +92,28 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) e
return nil
}

// waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set.
func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error {
fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String()
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return err
}
defer watch.Stop()

for ev := range watch.ResultChan() {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}
if node.Spec.PodCIDR != "" {
break
}
}
logrus.Info("PodCIDR assigned for node " + nodeName)
return nil
}

func createCNIConf(dir string) error {
if dir == "" {
return nil
Expand Down
42 changes: 20 additions & 22 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
"github.com/rancher/k3s/pkg/rootless"
"github.com/rancher/k3s/pkg/util"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
app2 "k8s.io/kubernetes/cmd/kube-proxy/app"
kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config"
Expand Down Expand Up @@ -106,16 +108,16 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {

util.WaitForAPIServerReady(coreClient, 30*time.Second)

if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}

if !nodeConfig.NoFlannel {
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}
}

if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil {
return err
}

if !nodeConfig.AgentConfig.DisableNPC {
if err := netpol.Run(ctx, nodeConfig); err != nil {
return err
Expand Down Expand Up @@ -220,17 +222,18 @@ func Run(ctx context.Context, cfg cmds.Agent) error {
return run(ctx, cfg, proxy)
}

func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error {
count := 0
for {
node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{})
if err != nil {
if count%30 == 0 {
logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err)
}
count++
time.Sleep(1 * time.Second)
continue
func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error {
fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String()
watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
if err != nil {
return err
}
defer watch.Stop()

for ev := range watch.ResultChan() {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}

updateNode := false
Expand Down Expand Up @@ -260,12 +263,7 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v
if updateNode {
if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil {
logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
continue
}
continue
}
logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error {
}
endpoint, ok := ev.Object.(*v1.Endpoints)
if !ok {
logrus.Errorf("Tunnel could not case event object to endpoint: %v", ev)
logrus.Errorf("Tunnel could not convert event object to endpoint: %v", ev)
continue watching
}

Expand Down
100 changes: 90 additions & 10 deletions pkg/daemons/executor/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@ package executor

import (
"context"
"errors"
"fmt"
"net/http"
"runtime"

"github.com/rancher/k3s/pkg/cli/cmds"
daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
"github.com/rancher/k3s/pkg/version"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/authentication/authenticator"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
ccm "k8s.io/cloud-provider"
cloudprovider "k8s.io/cloud-provider"
cloudproviderapi "k8s.io/cloud-provider/api"
ccmapp "k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
ccmopt "k8s.io/cloud-provider/options"
Expand All @@ -29,16 +38,19 @@ import (
)

func init() {
executor = Embedded{}
executor = &Embedded{}
}

type Embedded struct{}
type Embedded struct {
nodeConfig *daemonconfig.Node
}

func (Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error {
e.nodeConfig = nodeConfig
return nil
}

func (Embedded) Kubelet(args []string) error {
func (*Embedded) Kubelet(args []string) error {
command := kubelet.NewKubeletCommand(context.Background())
command.SetArgs(args)

Expand All @@ -49,7 +61,7 @@ func (Embedded) Kubelet(args []string) error {
return nil
}

func (Embedded) KubeProxy(args []string) error {
func (*Embedded) KubeProxy(args []string) error {
command := proxy.NewProxyCommand()
command.SetArgs(args)

Expand All @@ -60,12 +72,12 @@ func (Embedded) KubeProxy(args []string) error {
return nil
}

func (Embedded) APIServerHandlers() (authenticator.Request, http.Handler, error) {
func (*Embedded) APIServerHandlers() (authenticator.Request, http.Handler, error) {
startupConfig := <-app.StartupConfig
return startupConfig.Authenticator, startupConfig.Handler, nil
}

func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error {
command := app.NewAPIServerCommand(ctx.Done())
command.SetArgs(args)

Expand All @@ -77,19 +89,31 @@ func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args [
return nil
}

func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error {
func (e *Embedded) Scheduler(apiReady <-chan struct{}, args []string) error {
command := sapp.NewSchedulerCommand()
command.SetArgs(args)

go func() {
<-apiReady
// wait for Bootstrap to set nodeConfig
for e.nodeConfig == nil {
runtime.Gosched()
}
// If we're running the embedded cloud controller, wait for it to untaint at least one
// node (usually, the local node) before starting the scheduler to ensure that it
// finds a node that is ready to run pods during its initial scheduling loop.
if !e.nodeConfig.AgentConfig.DisableCCM {
if err := waitForUntaintedNode(context.Background(), e.nodeConfig.AgentConfig.KubeConfigKubelet); err != nil {
logrus.Fatalf("failed to wait for untained node: %v", err)
}
}
logrus.Fatalf("scheduler exited: %v", command.Execute())
}()

return nil
}

func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error {
func (*Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error {
command := cmapp.NewControllerManagerCommand()
command.SetArgs(args)

Expand All @@ -101,7 +125,7 @@ func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error
return nil
}

func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error {
func (*Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error {
ccmOptions, err := ccmopt.NewCloudControllerManagerOptions()
if err != nil {
logrus.Fatalf("unable to initialize command options: %v", err)
Expand Down Expand Up @@ -138,3 +162,59 @@ func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []stri

return nil
}

// waitForUntaintedNode watches nodes, waiting to find one not tainted as
// uninitialized by the external cloud provider.
func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return err
}
coreClient, err := typedcorev1.NewForConfig(restConfig)
if err != nil {
return err
}

// List first, to see if there's an existing node that will do
nodes, err := coreClient.Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, node := range nodes.Items {
if taint := getCloudTaint(node.Spec.Taints); taint == nil {
return nil
}
}

// List didn't give us an existing node, start watching at whatever ResourceVersion the list left off at.
watcher, err := coreClient.Nodes().Watch(ctx, metav1.ListOptions{ResourceVersion: nodes.ListMeta.ResourceVersion})
if err != nil {
return err
}
defer watcher.Stop()

for ev := range watcher.ResultChan() {
if ev.Type == watch.Added || ev.Type == watch.Modified {
node, ok := ev.Object.(*corev1.Node)
if !ok {
return fmt.Errorf("could not convert event object to node: %v", ev)
}
if taint := getCloudTaint(node.Spec.Taints); taint == nil {
return nil
}
}
}

return errors.New("watch channel closed")
}

// getCloudTaint returns the external cloud provider taint, if present.
// Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go
func getCloudTaint(taints []corev1.Taint) *corev1.Taint {
for _, taint := range taints {
if taint.Key == cloudproviderapi.TaintExternalCloudProvider {
return &taint
}
}
return nil
}
5 changes: 4 additions & 1 deletion pkg/server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ func NewContext(ctx context.Context, cfg string) (*Context, error) {
return nil, err
}

k8s := kubernetes.NewForConfigOrDie(restConfig)
k8s, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
return &Context{
K3s: k3s.NewFactoryFromConfigOrDie(restConfig),
Helm: helm.NewFactoryFromConfigOrDie(restConfig),
Expand Down