Skip to content

Commit

Permalink
Try waiting for scheduling / system pods ready
Browse files Browse the repository at this point in the history
Signed-off-by: Kimmo Lehto <[email protected]>
  • Loading branch information
kke committed Aug 28, 2023
1 parent 060bf1f commit ae0b75f
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 7 deletions.
24 changes: 18 additions & 6 deletions phase/upgrade_controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1"
"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/k0sctl/pkg/node"
"github.com/k0sproject/rig/exec"
"github.com/k0sproject/version"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -117,11 +116,24 @@ func (p *UpgradeControllers) Run() error {
if err := node.WaitKubeAPIReady(ctx, h, port); err != nil {
return err
}
// this stuff is here purely for temporary debugging purposes
for i := 0; i < 300; i++ {
output, _ := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n kube-system get pods"), exec.Sudo(h))
log.Infof("%s: scheduler status: %s", h, output)
time.Sleep(1 * time.Second)

if NoWait {
log.Warnf("%s: skipping scheduler and system pod checks because --no-wait given", h)
return nil
}

if err := node.WaitEventsScheduled(ctx, h); err != nil {
if !Force {
return err
}
log.Warnf("%s: failed to observe scheduling events after api start-up: %s", h, err)
}

if err := node.WaitSystemPodsRunning(ctx, h); err != nil {
if !Force {
return err
}
log.Warnf("%s: failed to observe system pods running after api start-up: %s", h, err)
}
}

Expand Down
90 changes: 89 additions & 1 deletion pkg/node/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

"github.com/k0sproject/k0sctl/pkg/apis/k0sctl.k0sproject.io/v1beta1/cluster"
"github.com/k0sproject/rig/exec"

log "github.com/sirupsen/logrus"
)

func retry(ctx context.Context, f func(ctx context.Context) error) error {
Expand Down Expand Up @@ -78,7 +80,8 @@ func kubeNodeReady(h *cluster.Host) (bool, error) {

type statusEvents struct {
Items []struct {
Reason string `json:"reason"`
Reason string `json:"reason"`
EventTime time.Time `json:"eventTime"`
} `json:"items"`
}

Expand All @@ -102,6 +105,91 @@ func WaitK0sDynamicConfigReady(ctx context.Context, h *cluster.Host) error {
})
}

// WaitEventsScheduled blocks until the node has started scheduling pods or context is cancelled
func WaitEventsScheduled(ctx context.Context, h *cluster.Host) error {
since := time.Now()
sinceStr := since.Format(time.RFC3339)

return retry(ctx, func(_ context.Context) error {
output, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), `-n kube-system get events --field-selector reason=Scheduled --since-time="%s"`, sinceStr), exec.Sudo(h))
if err != nil {
return fmt.Errorf("failed to get node events: %w", err)
}
events := &statusEvents{}
if err := json.Unmarshal([]byte(output), &events); err != nil {
return fmt.Errorf("failed to decode kubectl output: %w", err)
}
for _, e := range events.Items {
log.Debugf("%s: event %s at %s", h, e.Reason, e.EventTime)

if e.Reason != "Scheduled" {
log.Debugf("%s: skipping event %s - wrong event reason", h, e.Reason)
continue
}
if e.EventTime.Before(since) {
log.Debugf("%s: skipping event %s at %s - too old", h, e.Reason, e.EventTime)
continue
}
return nil
}
return fmt.Errorf("node not scheduled")
})
}

type podStatusList struct {
Items []struct {
Status struct {
ContainerStatuses []struct {
ContainerID string `json:"containerID"`
Name string `json:"name"`
Ready bool `json:"ready"`
} `json:"containerStatuses"`
Phase string `json:"phase"`
} `json:"status"`
} `json:"items"`
}

// WaitSystemPodsRunning blocks until all system pods are running or context is cancelled
func WaitSystemPodsRunning(ctx context.Context, h *cluster.Host) error {
return retry(ctx, func(_ context.Context) error {
output, err := h.ExecOutput(h.Configurer.KubectlCmdf(h, h.K0sDataDir(), "-n kube-system get pods -o json", h.K0sDataDir()), exec.Sudo(h))
if err != nil {
return fmt.Errorf("failed to get kube-system pods: %w", err)
}
pods := &podStatusList{}
if err := json.Unmarshal([]byte(output), &pods); err != nil {
return fmt.Errorf("failed to decode kubectl output: %w", err)
}

var running int
var notReady int

for _, p := range pods.Items {
if p.Status.Phase != "Running" {
log.Debugf("%s: pod phase '%s' - container statuses: %+v", h, p.Status.Phase, p.Status.ContainerStatuses)
continue
}
running++
for _, c := range p.Status.ContainerStatuses {
if !c.Ready {
log.Debugf("%s: container %s not ready", h, c.Name)
notReady++
}
}
}

if running == 0 {
return fmt.Errorf("no pods running")
}

if notReady > 0 {
return fmt.Errorf("%d system pod containers not ready", notReady)
}

return nil
})
}

// WaitHTTPStatus waits on the node until http status received for a GET from the URL is the
// expected one or context is cancelled
func WaitHTTPStatus(ctx context.Context, h *cluster.Host, url string, expected ...int) error {
Expand Down

0 comments on commit ae0b75f

Please sign in to comment.