diff --git a/pkg/agent/flannel/setup.go b/pkg/agent/flannel/setup.go index 79b16ec062e4..0600ce81ecf1 100644 --- a/pkg/agent/flannel/setup.go +++ b/pkg/agent/flannel/setup.go @@ -2,20 +2,21 @@ package flannel import ( "context" - "errors" "fmt" "net" "os" "path/filepath" "strings" - "time" + "github.com/pkg/errors" "github.com/rancher/k3s/pkg/agent/util" "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/apimachinery/pkg/fields" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" utilsnet "k8s.io/utils/net" ) @@ -87,27 +88,14 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error { return createFlannelConf(nodeConfig) } -func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) error { - nodeName := nodeConfig.AgentConfig.NodeName - - for { - node, err := nodes.Get(ctx, nodeName, metav1.GetOptions{}) - if err == nil && node.Spec.PodCIDR != "" { - break - } - if err == nil { - logrus.Info("Waiting for node " + nodeName + " CIDR not assigned yet") - } else { - logrus.Infof("Waiting for node %s: %v", nodeName, err) - } - time.Sleep(2 * time.Second) +func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInterface) error { + if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, nodes); err != nil { + return errors.Wrap(err, "failed to wait for PodCIDR assignment") } - logrus.Info("Node CIDR assigned for: " + nodeName) netMode, err := findNetMode(nodeConfig.AgentConfig.ClusterCIDRs) if err != nil { - logrus.Fatalf("Error checking netMode") - return err + return errors.Wrap(err, "failed to check netMode for flannel") } go func() { err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConf, nodeConfig.AgentConfig.KubeConfigKubelet, netMode) @@ -119,6 +107,28 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes v1.NodeInterface) e return nil } +// waitForPodCIDR watches nodes with this node's name, and returns when the PodCIDR has been set. +func waitForPodCIDR(ctx context.Context, nodeName string, nodes typedcorev1.NodeInterface) error { + fieldSelector := fields.Set{metav1.ObjectNameField: nodeName}.String() + watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + if err != nil { + return err + } + defer watch.Stop() + + for ev := range watch.ResultChan() { + node, ok := ev.Object.(*corev1.Node) + if !ok { + return fmt.Errorf("could not convert event object to node: %v", ev) + } + if node.Spec.PodCIDR != "" { + break + } + } + logrus.Info("PodCIDR assigned for node " + nodeName) + return nil +} + func createCNIConf(dir string) error { if dir == "" { return nil diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 41fee59cb070..bbfae253c81e 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -31,11 +31,13 @@ import ( "github.com/rancher/k3s/pkg/rootless" "github.com/rancher/k3s/pkg/util" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/clientcmd" app2 "k8s.io/kubernetes/cmd/kube-proxy/app" kubeproxyconfig "k8s.io/kubernetes/pkg/proxy/apis/config" @@ -137,16 +139,16 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error { util.WaitForAPIServerReady(coreClient, 30*time.Second) + if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil { + return err + } + if !nodeConfig.NoFlannel { if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil { return err } } - if err := configureNode(ctx, &nodeConfig.AgentConfig, coreClient.CoreV1().Nodes()); err != nil { - return err - } - if !nodeConfig.AgentConfig.DisableNPC { if err := netpol.Run(ctx, nodeConfig); err != nil { return err @@ -298,17 +300,18 @@ func validateCgroupsV2() error { return nil } -func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v1.NodeInterface) error { - count := 0 - for { - node, err := nodes.Get(ctx, agentConfig.NodeName, metav1.GetOptions{}) - if err != nil { - if count%30 == 0 { - logrus.Infof("Waiting for kubelet to be ready on node %s: %v", agentConfig.NodeName, err) - } - count++ - time.Sleep(1 * time.Second) - continue +func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes typedcorev1.NodeInterface) error { + fieldSelector := fields.Set{metav1.ObjectNameField: agentConfig.NodeName}.String() + watch, err := nodes.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector}) + if err != nil { + return err + } + defer watch.Stop() + + for ev := range watch.ResultChan() { + node, ok := ev.Object.(*corev1.Node) + if !ok { + return fmt.Errorf("could not convert event object to node: %v", ev) } updateNode := false @@ -338,12 +341,7 @@ func configureNode(ctx context.Context, agentConfig *daemonconfig.Agent, nodes v if updateNode { if _, err := nodes.Update(ctx, node, metav1.UpdateOptions{}); err != nil { logrus.Infof("Failed to update node %s: %v", agentConfig.NodeName, err) - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second): - continue - } + continue } logrus.Infof("labels have been set successfully on node: %s", agentConfig.NodeName) } else { diff --git a/pkg/agent/tunnel/tunnel.go b/pkg/agent/tunnel/tunnel.go index fb6806d2ad86..0503152eaa58 100644 --- a/pkg/agent/tunnel/tunnel.go +++ b/pkg/agent/tunnel/tunnel.go @@ -103,7 +103,7 @@ func Setup(ctx context.Context, config *config.Node, proxy proxy.Proxy) error { } endpoint, ok := ev.Object.(*v1.Endpoints) if !ok { - logrus.Errorf("Tunnel could not case event object to endpoint: %v", ev) + logrus.Errorf("Tunnel could not convert event object to endpoint: %v", ev) continue watching } diff --git a/pkg/daemons/executor/embed.go b/pkg/daemons/executor/embed.go index f4d908712e52..accc8f79ec78 100644 --- a/pkg/daemons/executor/embed.go +++ b/pkg/daemons/executor/embed.go @@ -4,16 +4,25 @@ package executor import ( "context" + "errors" + "fmt" "net/http" + "runtime" "github.com/rancher/k3s/pkg/cli/cmds" daemonconfig "github.com/rancher/k3s/pkg/daemons/config" "github.com/rancher/k3s/pkg/version" "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/authenticator" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/clientcmd" ccm "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider" + cloudproviderapi "k8s.io/cloud-provider/api" ccmapp "k8s.io/cloud-provider/app" cloudcontrollerconfig "k8s.io/cloud-provider/app/config" ccmopt "k8s.io/cloud-provider/options" @@ -29,16 +38,19 @@ import ( ) func init() { - executor = Embedded{} + executor = &Embedded{} } -type Embedded struct{} +type Embedded struct { + nodeConfig *daemonconfig.Node +} -func (Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { +func (e *Embedded) Bootstrap(ctx context.Context, nodeConfig *daemonconfig.Node, cfg cmds.Agent) error { + e.nodeConfig = nodeConfig return nil } -func (Embedded) Kubelet(args []string) error { +func (*Embedded) Kubelet(args []string) error { command := kubelet.NewKubeletCommand(context.Background()) command.SetArgs(args) @@ -49,7 +61,7 @@ func (Embedded) Kubelet(args []string) error { return nil } -func (Embedded) KubeProxy(args []string) error { +func (*Embedded) KubeProxy(args []string) error { command := proxy.NewProxyCommand() command.SetArgs(args) @@ -60,12 +72,12 @@ func (Embedded) KubeProxy(args []string) error { return nil } -func (Embedded) APIServerHandlers() (authenticator.Request, http.Handler, error) { +func (*Embedded) APIServerHandlers() (authenticator.Request, http.Handler, error) { startupConfig := <-app.StartupConfig return startupConfig.Authenticator, startupConfig.Handler, nil } -func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error { +func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args []string) error { command := app.NewAPIServerCommand(ctx.Done()) command.SetArgs(args) @@ -77,19 +89,31 @@ func (Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args [ return nil } -func (Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { +func (e *Embedded) Scheduler(apiReady <-chan struct{}, args []string) error { command := sapp.NewSchedulerCommand() command.SetArgs(args) go func() { <-apiReady + // wait for Bootstrap to set nodeConfig + for e.nodeConfig == nil { + runtime.Gosched() + } + // If we're running the embedded cloud controller, wait for it to untaint at least one + // node (usually, the local node) before starting the scheduler to ensure that it + // finds a node that is ready to run pods during its initial scheduling loop. + if !e.nodeConfig.AgentConfig.DisableCCM { + if err := waitForUntaintedNode(context.Background(), e.nodeConfig.AgentConfig.KubeConfigKubelet); err != nil { + logrus.Fatalf("failed to wait for untained node: %v", err) + } + } logrus.Fatalf("scheduler exited: %v", command.Execute()) }() return nil } -func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error { +func (*Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error { command := cmapp.NewControllerManagerCommand() command.SetArgs(args) @@ -101,7 +125,7 @@ func (Embedded) ControllerManager(apiReady <-chan struct{}, args []string) error return nil } -func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error { +func (*Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []string) error { ccmOptions, err := ccmopt.NewCloudControllerManagerOptions() if err != nil { logrus.Fatalf("unable to initialize command options: %v", err) @@ -138,3 +162,59 @@ func (Embedded) CloudControllerManager(ccmRBACReady <-chan struct{}, args []stri return nil } + +// waitForUntaintedNode watches nodes, waiting to find one not tainted as +// uninitialized by the external cloud provider. +func waitForUntaintedNode(ctx context.Context, kubeConfig string) error { + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeConfig) + if err != nil { + return err + } + coreClient, err := typedcorev1.NewForConfig(restConfig) + if err != nil { + return err + } + + // List first, to see if there's an existing node that will do + nodes, err := coreClient.Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + for _, node := range nodes.Items { + if taint := getCloudTaint(node.Spec.Taints); taint == nil { + return nil + } + } + + // List didn't give us an existing node, start watching at whatever ResourceVersion the list left off at. + watcher, err := coreClient.Nodes().Watch(ctx, metav1.ListOptions{ResourceVersion: nodes.ListMeta.ResourceVersion}) + if err != nil { + return err + } + defer watcher.Stop() + + for ev := range watcher.ResultChan() { + if ev.Type == watch.Added || ev.Type == watch.Modified { + node, ok := ev.Object.(*corev1.Node) + if !ok { + return fmt.Errorf("could not convert event object to node: %v", ev) + } + if taint := getCloudTaint(node.Spec.Taints); taint == nil { + return nil + } + } + } + + return errors.New("watch channel closed") +} + +// getCloudTaint returns the external cloud provider taint, if present. +// Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go +func getCloudTaint(taints []corev1.Taint) *corev1.Taint { + for _, taint := range taints { + if taint.Key == cloudproviderapi.TaintExternalCloudProvider { + return &taint + } + } + return nil +} diff --git a/pkg/server/context.go b/pkg/server/context.go index fac72708db6a..d30f66486c21 100644 --- a/pkg/server/context.go +++ b/pkg/server/context.go @@ -59,7 +59,10 @@ func NewContext(ctx context.Context, cfg string) (*Context, error) { return nil, err } - k8s := kubernetes.NewForConfigOrDie(restConfig) + k8s, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, err + } return &Context{ K3s: k3s.NewFactoryFromConfigOrDie(restConfig), Helm: helm.NewFactoryFromConfigOrDie(restConfig),