Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues with --disable-agent and --egress-selector-mode=pod|cluster #7331

Merged
merged 4 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/agent/cridockerd/cridockerd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Run(ctx context.Context, cfg *config.Node) error {
go func() {
defer func() {
if err := recover(); err != nil {
logrus.Fatalf("cri-dockerd panic: %s", debug.Stack())
logrus.WithField("stack", string(debug.Stack())).Fatalf("cri-dockerd panic: %v", err)
}
}()
logrus.Fatalf("cri-dockerd exited: %v", command.ExecuteContext(ctx))
Expand Down
24 changes: 14 additions & 10 deletions pkg/agent/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,20 @@ func Setup(ctx context.Context, config *daemonconfig.Node, proxy proxy.Proxy) er
close(apiServerReady)
}()

// Allow the kubelet port, as published via our node object
go tunnel.setKubeletPort(ctx, apiServerReady)

switch tunnel.mode {
case daemonconfig.EgressSelectorModeCluster:
// In Cluster mode, we allow the cluster CIDRs, and any connections to the node's IPs for pods using host network.
tunnel.clusterAuth(config)
case daemonconfig.EgressSelectorModePod:
// In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network.
go tunnel.watchPods(ctx, apiServerReady, config)
// We don't need to run the tunnel authorizer if the container runtime endpoint is /dev/null,
// signifying that this is an agentless server that will not register a node.
if config.ContainerRuntimeEndpoint != "/dev/null" {
// Allow the kubelet port, as published via our node object.
go tunnel.setKubeletPort(ctx, apiServerReady)

switch tunnel.mode {
case daemonconfig.EgressSelectorModeCluster:
// In Cluster mode, we allow the cluster CIDRs, and any connections to the node's IPs for pods using host network.
tunnel.clusterAuth(config)
case daemonconfig.EgressSelectorModePod:
// In Pod mode, we watch pods assigned to this node, and allow their addresses, as well as ports used by containers with host network.
go tunnel.watchPods(ctx, apiServerReady, config)
}
}

// The loadbalancer is only disabled when there is a local apiserver. Servers without a local
Expand Down
7 changes: 5 additions & 2 deletions pkg/cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,11 @@ func validateNetworkConfiguration(serverConfig server.Config) error {
}

switch serverConfig.ControlConfig.EgressSelectorMode {
case config.EgressSelectorModeAgent, config.EgressSelectorModeCluster,
config.EgressSelectorModeDisabled, config.EgressSelectorModePod:
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
case config.EgressSelectorModeAgent, config.EgressSelectorModeDisabled:
if serverConfig.DisableAgent {
logrus.Warn("Webhooks and apiserver aggregation may not function properly without an agent; please set egress-selector-mode to 'cluster' or 'pod'")
}
default:
return fmt.Errorf("invalid egress-selector-mode %s", serverConfig.ControlConfig.EgressSelectorMode)
}
Expand Down
33 changes: 8 additions & 25 deletions pkg/cluster/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *Cluster) start(ctx context.Context) error {
if _, err := os.Stat(resetFile); err == nil {
// Before removing reset file we need to delete the node passwd secret in case the node
// password from the previously restored snapshot differs from the current password on disk.
go c.deleteNodePasswdSecret(ctx)
c.config.Runtime.ClusterControllerStarts["node-password-secret-cleanup"] = c.deleteNodePasswdSecret
os.Remove(resetFile)
}

Expand Down Expand Up @@ -176,30 +176,13 @@ func (c *Cluster) setupEtcdProxy(ctx context.Context, etcdProxy etcd.Proxy) {

// deleteNodePasswdSecret wipes out the node password secret after restoration
func (c *Cluster) deleteNodePasswdSecret(ctx context.Context) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for range t.C {
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Infof("waiting for node name to be set")
continue
nodeName := os.Getenv("NODE_NAME")
secretsClient := c.config.Runtime.Core.Core().V1().Secret()
if err := nodepassword.Delete(secretsClient, nodeName); err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("node password secret is not found for node %s", nodeName)
return
}
// the core factory may not yet be initialized so we
// want to wait until it is so not to evoke a panic.
if c.config.Runtime.Core == nil {
logrus.Infof("runtime is not yet initialized")
continue
}
secretsClient := c.config.Runtime.Core.Core().V1().Secret()
if err := nodepassword.Delete(secretsClient, nodeName); err != nil {
if apierrors.IsNotFound(err) {
logrus.Debugf("node password secret is not found for node %s", nodeName)
return
}
logrus.Warnf("failed to delete old node password secret: %v", err)
continue
}
return
logrus.Warnf("failed to delete old node password secret: %v", err)
}

}
19 changes: 5 additions & 14 deletions pkg/daemons/control/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/daemons/control/proxy"
Expand Down Expand Up @@ -45,7 +44,7 @@ func setupTunnel(ctx context.Context, cfg *config.Control) (http.Handler, error)
server: remotedialer.New(authorizer, loggingErrorWriter),
egress: map[string]bool{},
}
go tunnel.watch(ctx)
cfg.Runtime.ClusterControllerStarts["tunnel-server"] = tunnel.watch
return tunnel, nil
}

Expand Down Expand Up @@ -112,17 +111,10 @@ func (t *TunnelServer) watch(ctx context.Context) {
return
}

for {
if t.config.Runtime.Core != nil {
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
switch t.config.EgressSelectorMode {
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
t.config.Runtime.Core.Core().V1().Pod().OnChange(ctx, version.Program+"-tunnel-server", t.onChangePod)
}
return
}
logrus.Infof("Tunnel server egress proxy waiting for runtime core to become available")
time.Sleep(5 * time.Second)
t.config.Runtime.Core.Core().V1().Node().OnChange(ctx, version.Program+"-tunnel-server", t.onChangeNode)
switch t.config.EgressSelectorMode {
case config.EgressSelectorModeCluster, config.EgressSelectorModePod:
t.config.Runtime.Core.Core().V1().Pod().OnChange(ctx, version.Program+"-tunnel-server", t.onChangePod)
}
}

Expand Down Expand Up @@ -173,7 +165,6 @@ func (t *TunnelServer) onChangePod(podName string, pod *v1.Pod) (*v1.Pod, error)
}
}
return pod, nil

}

// serveConnect attempts to handle the HTTP CONNECT request by dialing
Expand Down
12 changes: 6 additions & 6 deletions pkg/daemons/executor/embed.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (e *Embedded) Kubelet(ctx context.Context, args []string) error {
go func() {
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("kubelet panic: %v", err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("kubelet panic: %v", err)
}
}()
// The embedded executor doesn't need the kubelet to come up to host any components, and
Expand All @@ -79,7 +79,7 @@ func (*Embedded) KubeProxy(ctx context.Context, args []string) error {
go func() {
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("kube-proxy panic: %v", err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("kube-proxy panic: %v", err)
}
}()
logrus.Fatalf("kube-proxy exited: %v", command.ExecuteContext(ctx))
Expand All @@ -101,7 +101,7 @@ func (*Embedded) APIServer(ctx context.Context, etcdReady <-chan struct{}, args
<-etcdReady
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("apiserver panic: %v", err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("apiserver panic: %v", err)
}
}()
logrus.Fatalf("apiserver exited: %v", command.ExecuteContext(ctx))
Expand Down Expand Up @@ -130,7 +130,7 @@ func (e *Embedded) Scheduler(ctx context.Context, apiReady <-chan struct{}, args
}
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("scheduler panic: %v", err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("scheduler panic: %v", err)
}
}()
logrus.Fatalf("scheduler exited: %v", command.ExecuteContext(ctx))
Expand All @@ -147,7 +147,7 @@ func (*Embedded) ControllerManager(ctx context.Context, apiReady <-chan struct{}
<-apiReady
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("controller-manager panic: %v", err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("controller-manager panic: %v", err)
}
}()
logrus.Fatalf("controller-manager exited: %v", command.ExecuteContext(ctx))
Expand Down Expand Up @@ -180,7 +180,7 @@ func (*Embedded) CloudControllerManager(ctx context.Context, ccmRBACReady <-chan
<-ccmRBACReady
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("cloud-controller-manager panic: %v", err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("cloud-controller-manager panic: %v", err)
}
}()
logrus.Errorf("cloud-controller-manager exited: %v", command.ExecuteContext(ctx))
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,11 @@ func apiserverControllers(ctx context.Context, sc *Context, config *Config) {
panic(errors.Wrapf(err, "failed to start %s leader controller", util.GetFunctionName(controller)))
}
}

// Re-run context startup after core and leader-elected controllers have started. Additional
// informer caches may need to start for the newly added OnChange callbacks.
if err := sc.Start(ctx); err != nil {
panic(err)
panic(errors.Wrap(err, "failed to start wranger controllers"))
}
}

Expand All @@ -178,7 +181,7 @@ func apiserverControllers(ctx context.Context, sc *Context, config *Config) {
func runOrDie(ctx context.Context, name string, cb leader.Callback) {
defer func() {
if err := recover(); err != nil {
logrus.WithField("stack", debug.Stack()).Fatalf("%s controller panic: %v", name, err)
logrus.WithField("stack", string(debug.Stack())).Fatalf("%s controller panic: %v", name, err)
}
}()
cb(ctx)
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/rotateca/rotateca_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package secretsencryption
package rotateca

import (
"flag"
Expand Down
71 changes: 71 additions & 0 deletions tests/e2e/startup/startup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,77 @@ var _ = Describe("Various Startup Configurations", Ordered, func() {
Expect(err).NotTo(HaveOccurred())
})
})
Context("Verify disable-agent and egress-selector-mode flags", func() {
It("Starts K3s with no issues", func() {
disableAgentYAML := "disable-agent: true\negress-selector-mode: cluster"
err := StartK3sCluster(append(serverNodeNames, agentNodeNames...), disableAgentYAML, "")
Expect(err).NotTo(HaveOccurred(), e2e.GetVagrantLog(err))

fmt.Println("CLUSTER CONFIG")
fmt.Println("OS:", *nodeOS)
fmt.Println("Server Nodes:", serverNodeNames)
fmt.Println("Agent Nodes:", agentNodeNames)
kubeConfigFile, err = e2e.GenKubeConfigFile(serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
})

It("Checks node and pod status", func() {
fmt.Printf("\nFetching node status\n")
Eventually(func(g Gomega) {
nodes, err := e2e.ParseNodes(kubeConfigFile, false)
g.Expect(err).NotTo(HaveOccurred())
for _, node := range nodes {
g.Expect(node.Status).Should(Equal("Ready"))
}
}, "620s", "5s").Should(Succeed())
_, _ = e2e.ParseNodes(kubeConfigFile, true)

fmt.Printf("\nFetching pods status\n")
Eventually(func(g Gomega) {
pods, err := e2e.ParsePods(kubeConfigFile, false)
g.Expect(err).NotTo(HaveOccurred())
for _, pod := range pods {
if strings.Contains(pod.Name, "helm-install") {
g.Expect(pod.Status).Should(Equal("Completed"), pod.Name)
} else {
g.Expect(pod.Status).Should(Equal("Running"), pod.Name)
}
}
}, "620s", "5s").Should(Succeed())
_, _ = e2e.ParsePods(kubeConfigFile, true)
})

It("Returns pod metrics", func() {
cmd := "kubectl top pod -A"
Eventually(func() error {
_, err := e2e.RunCmdOnNode(cmd, serverNodeNames[0])
return err
}, "620s", "5s").Should(Succeed())
})

It("Returns node metrics", func() {
cmd := "kubectl top node"
_, err := e2e.RunCmdOnNode(cmd, serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
})

It("Runs an interactive command a pod", func() {
cmd := "kubectl run busybox --rm -it --restart=Never --image=rancher/mirrored-library-busybox:1.34.1 -- uname -a"
_, err := e2e.RunCmdOnNode(cmd, serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
})

It("Collects logs from a pod", func() {
cmd := "kubectl logs -n kube-system -l app.kubernetes.io/name=traefik -c traefik"
_, err := e2e.RunCmdOnNode(cmd, serverNodeNames[0])
Expect(err).NotTo(HaveOccurred())
})

It("Kills the cluster", func() {
err := KillK3sCluster(append(serverNodeNames, agentNodeNames...))
Expect(err).NotTo(HaveOccurred())
})
})
})

var failed bool
Expand Down