Skip to content

Commit

Permalink
Merge pull request #772 from k0sproject/validate-connection-vrrp-ip
Browse files Browse the repository at this point in the history
CPLB virtual address fall back for kube API URL
  • Loading branch information
kke authored Oct 22, 2024
2 parents b8a2fd0 + fd0ba50 commit 4c6450d
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 126 deletions.
43 changes: 1 addition & 42 deletions phase/configure_k0s.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p *ConfigureK0s) generateDefaultConfig() (string, error) {
func (p *ConfigureK0s) Run() error {
controllers := p.Config.Spec.Hosts.Controllers().Filter(func(h *cluster.Host) bool {
return !h.Reset && len(h.Metadata.K0sNewConfig) > 0
})
})
return p.parallelDo(controllers, p.configureK0s)
}

Expand Down Expand Up @@ -270,19 +270,6 @@ func (p *ConfigureK0s) configureK0s(h *cluster.Host) error {
return nil
}

func addUnlessExist(slice *[]string, s string) {
var found bool
for _, v := range *slice {
if v == s {
found = true
break
}
}
if !found {
*slice = append(*slice, s)
}
}

func (p *ConfigureK0s) configFor(h *cluster.Host) (string, error) {
var cfg dig.Mapping

Expand All @@ -298,40 +285,12 @@ func (p *ConfigureK0s) configFor(h *cluster.Host) (string, error) {
cfg = p.newBaseConfig.Dup()
}

var sans []string

var addr string
if h.PrivateAddress != "" {
addr = h.PrivateAddress
} else {
addr = h.Address()
}
cfg.DigMapping("spec", "api")["address"] = addr
addUnlessExist(&sans, addr)

oldsans := cfg.Dig("spec", "api", "sans")
switch oldsans := oldsans.(type) {
case []interface{}:
for _, v := range oldsans {
if s, ok := v.(string); ok {
addUnlessExist(&sans, s)
}
}
case []string:
for _, v := range oldsans {
addUnlessExist(&sans, v)
}
}

var controllers cluster.Hosts = p.Config.Spec.Hosts.Controllers()
for _, c := range controllers {
addUnlessExist(&sans, c.Address())
if c.PrivateAddress != "" {
addUnlessExist(&sans, c.PrivateAddress)
}
}
addUnlessExist(&sans, "127.0.0.1")
cfg.DigMapping("spec", "api")["sans"] = sans

if cfg.Dig("spec", "storage", "etcd", "peerAddress") != nil || h.PrivateAddress != "" {
cfg.DigMapping("spec", "storage", "etcd")["peerAddress"] = addr
Expand Down
44 changes: 1 addition & 43 deletions phase/get_kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package phase

import (
"fmt"
"strings"

"github.com/alessio/shellescape"
"github.com/k0sproject/dig"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"
"gopkg.in/yaml.v2"
"k8s.io/client-go/tools/clientcmd"
)

Expand All @@ -31,24 +28,6 @@ var readKubeconfig = func(h *cluster.Host) (string, error) {
return output, nil
}

var k0sConfig = func(h *cluster.Host) (dig.Mapping, error) {
cfgContent, err := h.Configurer.ReadFile(h, h.Configurer.K0sConfigPath())
if err != nil {
return nil, fmt.Errorf("read k0s config from host: %w", err)
}

var cfg dig.Mapping
if err := yaml.Unmarshal([]byte(cfgContent), &cfg); err != nil {
return nil, fmt.Errorf("unmarshal k0s config: %w", err)
}

if err != nil {
return nil, fmt.Errorf("parse k0s config: %w", err)
}

return cfg, nil
}

func (p *GetKubeconfig) DryRun() error {
p.DryMsg(p.Config.Spec.Hosts.Controllers()[0], "get admin kubeconfig")
return nil
Expand All @@ -58,34 +37,13 @@ func (p *GetKubeconfig) DryRun() error {
func (p *GetKubeconfig) Run() error {
h := p.Config.Spec.Hosts.Controllers()[0]

cfg, err := k0sConfig(h)
if err != nil {
return err
}

output, err := readKubeconfig(h)
if err != nil {
return fmt.Errorf("read kubeconfig from host: %w", err)
}

if p.APIAddress == "" {
// the controller admin.conf is aways pointing to localhost, thus we need to change the address
// something usable from outside
address := h.Address()
if a, ok := cfg.Dig("spec", "api", "externalAddress").(string); ok && a != "" {
address = a
}

port := 6443
if p, ok := cfg.Dig("spec", "api", "port").(int); ok && p != 0 {
port = p
}

if strings.Contains(address, ":") {
p.APIAddress = fmt.Sprintf("https://[%s]:%d", address, port)
} else {
p.APIAddress = fmt.Sprintf("https://%s:%d", address, port)
}
p.APIAddress = p.Config.Spec.KubeAPIURL()
}

cfgString, err := kubeConfig(output, p.Config.Metadata.Name, p.APIAddress)
Expand Down
6 changes: 0 additions & 6 deletions phase/get_kubeconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ func TestGetKubeconfig(t *testing.T) {
defer func() { readKubeconfig = origReadKubeconfig }()
readKubeconfig = fakeReader

origK0sConfig := k0sConfig
defer func() { k0sConfig = origK0sConfig }()
k0sConfig = func(h *cluster.Host) (dig.Mapping, error) {
return cfg.Spec.K0s.Config, nil
}

p := GetKubeconfig{GenericPhase: GenericPhase{Config: cfg}}
require.NoError(t, p.Run())
conf, err := clientcmd.Load([]byte(cfg.Metadata.Kubeconfig))
Expand Down
8 changes: 1 addition & 7 deletions phase/initialize_k0s.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (p *InitializeK0s) Run() error {
}
return nil
})

if err != nil {
return err
}
Expand All @@ -116,18 +115,13 @@ func (p *InitializeK0s) Run() error {
return err
}

port := 6443
if p, ok := p.Config.Spec.K0s.Config.Dig("spec", "api", "port").(int); ok {
port = p
}
log.Infof("%s: waiting for kubernetes api to respond", h)
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, port)); err != nil {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config)); err != nil {
return err
}

return nil
})

if err != nil {
return err
}
Expand Down
9 changes: 2 additions & 7 deletions phase/install_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *InstallControllers) After() error {

// Run the phase
func (p *InstallControllers) Run() error {
url := p.Config.Spec.KubeAPIURL()
url := p.Config.Spec.InternalKubeAPIURL()
healthz := fmt.Sprintf("%s/healthz", url)

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
Expand Down Expand Up @@ -191,11 +191,6 @@ func (p *InstallControllers) Run() error {
}

func (p *InstallControllers) waitJoined(h *cluster.Host) error {
port := 6443
if p, ok := p.Config.Spec.K0s.Config.Dig("spec", "api", "port").(int); ok {
port = p
}

log.Infof("%s: waiting for kubernetes api to respond", h)
return retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, port))
return retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config))
}
2 changes: 1 addition & 1 deletion phase/install_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (p *InstallWorkers) After() error {

// Run the phase
func (p *InstallWorkers) Run() error {
url := p.Config.Spec.KubeAPIURL()
url := p.Config.Spec.InternalKubeAPIURL()
healthz := fmt.Sprintf("%s/healthz", url)

err := p.parallelDo(p.hosts, func(h *cluster.Host) error {
Expand Down
6 changes: 1 addition & 5 deletions phase/upgrade_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,9 @@ func (p *UpgradeControllers) Run() error {
if err != nil {
return err
}
port := 6443
if p, ok := p.Config.Spec.K0s.Config.Dig("spec", "api", "port").(int); ok {
port = p
}

if p.IsWet() {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, port)); err != nil {
if err := retry.Timeout(context.TODO(), retry.DefaultTimeout, node.KubeAPIReadyFunc(h, p.Config)); err != nil {
return fmt.Errorf("kube api did not become ready: %w", err)
}
}
Expand Down
75 changes: 62 additions & 13 deletions pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cluster

import (
"fmt"
"strings"

"github.com/creasty/defaults"
"github.com/jellydator/validation"
"github.com/k0sproject/dig"
)

// Spec defines cluster config spec section
Expand Down Expand Up @@ -80,24 +82,71 @@ func (s *Spec) Validate() error {
)
}

// KubeAPIURL returns an url to the cluster's kube api
func (s *Spec) KubeAPIURL() string {
var caddr string
func (s *Spec) clusterExternalAddress() string {
if a := s.K0s.Config.DigString("spec", "api", "externalAddress"); a != "" {
caddr = a
} else {
leader := s.K0sLeader()
if leader.PrivateAddress != "" {
caddr = leader.PrivateAddress
} else {
caddr = leader.Address()
return a
}

if cplb, ok := s.K0s.Config.Dig("spec", "network", "controlPlaneLoadBalancing").(dig.Mapping); ok {
if enabled, ok := cplb.Dig("enabled").(bool); ok && enabled {
vrrpAddresses := cplb.Dig("virtualServers").([]string)
if len(vrrpAddresses) > 0 {
return vrrpAddresses[0]
}
}
}

cport := 6443
return s.K0sLeader().Address()
}

func (s *Spec) clusterInternalAddress() string {
leader := s.K0sLeader()
if leader.PrivateAddress != "" {
return leader.PrivateAddress
} else {
return leader.Address()
}
}

const defaultAPIPort = 6443

func (s *Spec) APIPort() int {
if p, ok := s.K0s.Config.Dig("spec", "api", "port").(int); ok {
cport = p
return p
}
return defaultAPIPort
}

return fmt.Sprintf("https://%s:%d", caddr, cport)
// KubeAPIURL returns an external url to the cluster's kube API
func (s *Spec) KubeAPIURL() string {
return fmt.Sprintf("https://%s:%d", formatIPV6(s.clusterExternalAddress()), s.APIPort())
}

// InternalKubeAPIURL returns a cluster internal url to the cluster's kube API
func (s *Spec) InternalKubeAPIURL() string {
return fmt.Sprintf("https://%s:%d", formatIPV6(s.clusterInternalAddress()), s.APIPort())
}

// NodeInternalKubeAPIURL returns a cluster internal url to the node's kube API
func (s *Spec) NodeInternalKubeAPIURL(h *Host) string {
addr := "127.0.0.1"

// spec.api.onlyBindToAddress was introduced in k0s 1.30. Setting it to true will make the API server only
// listen on the IP address configured by the `address` option.
if onlyBindAddr, ok := s.K0s.Config.Dig("spec", "api", "onlyBindToAddress").(bool); ok && onlyBindAddr {
if h.PrivateAddress != "" {
addr = h.PrivateAddress
} else {
addr = h.Address()
}
}

return fmt.Sprintf("https://%s:%d", formatIPV6(addr), s.APIPort())
}

func formatIPV6(address string) string {
if strings.Contains(address, ":") {
return fmt.Sprintf("[%s]", address)
}
return address
}
5 changes: 3 additions & 2 deletions pkg/node/statusfunc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"

Expand Down Expand Up @@ -195,8 +196,8 @@ func ServiceStoppedFunc(h *cluster.Host, service string) retryFunc {
}

// KubeAPIReadyFunc returns a function that returns an error unless the host's local kube api responds to /version
func KubeAPIReadyFunc(h *cluster.Host, port int) retryFunc {
func KubeAPIReadyFunc(h *cluster.Host, config *v1beta1.Cluster) retryFunc {
// If the anon-auth is disabled on kube api the version endpoint will give 401
// thus we need to accept both 200 and 401 as valid statuses when checking kube api
return HTTPStatusFunc(h, fmt.Sprintf("https://localhost:%d/version", port), 200, 401)
return HTTPStatusFunc(h, fmt.Sprintf("%s/version", config.Spec.NodeInternalKubeAPIURL(h)), 200, 401)
}

0 comments on commit 4c6450d

Please sign in to comment.