Skip to content

Commit

Permalink
Yurtctl: add precheck for reducing convert failure (#675)
Browse files Browse the repository at this point in the history
  • Loading branch information
Peeknut authored Dec 28, 2021
1 parent c23c8d1 commit 4455011
Show file tree
Hide file tree
Showing 9 changed files with 848 additions and 450 deletions.
2 changes: 1 addition & 1 deletion cmd/yurt-node-servant/preflight-convert/preflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,6 @@ func setFlags(cmd *cobra.Command) {
cmd.Flags().String("yurt-tunnel-agent-image", latestYurtTunnelAgentImage, "The yurt-tunnel-agent image.")
cmd.Flags().BoolP("deploy-yurttunnel", "t", false, "If set, yurt-tunnel-agent will be deployed.")
cmd.Flags().String("ignore-preflight-errors", "", "A list of checks whose errors will be shown as warnings. "+
"And value needs to be lowercase. Example: 'isprivilegeduser,imagepull'.Value 'all' ignores errors from all checks.",
"Example: 'isprivilegeduser,imagepull'.Value 'all' ignores errors from all checks.",
)
}
2 changes: 1 addition & 1 deletion pkg/node-servant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ spec:
- /bin/sh
- -c
args:
- "/usr/local/bin/entry.sh preflight-convert"
- "/usr/local/bin/entry.sh preflight-convert if {{.ignore_preflight_errors}} {{if .ignore_preflight_errors}}--ignore-preflight-errors {{.ignore_preflight_errors}} {{end}}"
securityContext:
privileged: true
volumeMounts:
Expand Down
1 change: 1 addition & 0 deletions pkg/node-servant/preflight-convert/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (o *Options) Complete(flags *pflag.FlagSet) error {
return err
}
if ipStr != "" {
ipStr = strings.ToLower(ipStr)
o.IgnorePreflightErrors = sets.NewString(strings.Split(ipStr, ",")...)
}

Expand Down
177 changes: 177 additions & 0 deletions pkg/preflight/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@ package preflight

import (
"bytes"
"context"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"time"

"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
utilsexec "k8s.io/utils/exec"

nodeservant "github.com/openyurtio/openyurt/pkg/node-servant"
"github.com/openyurtio/openyurt/pkg/node-servant/components"
"github.com/openyurtio/openyurt/pkg/projectinfo"
kubeutil "github.com/openyurtio/openyurt/pkg/yurtctl/util/kubernetes"
)

// Error defines struct for communicating error messages generated by preflight-convert-convert checks
Expand Down Expand Up @@ -72,6 +81,121 @@ func (ipuc IsPrivilegedUserCheck) Check() (warnings, errorList []error) {
return nil, nil
}

// NodeReadyCheck checks the nodes status whether is ready.
type NodeReadyCheck struct {
NodeLst *v1.NodeList
}

func (NodeReadyCheck) Name() string {
return "NodeReady"
}

func (nrc NodeReadyCheck) Check() (warnings, errorList []error) {
klog.V(1).Infoln("validating node status")
var notReadyNodeNames []string
for _, node := range nrc.NodeLst.Items {
if !isNodeReady(&node.Status) {
notReadyNodeNames = append(notReadyNodeNames, node.Name)
}
}
if len(notReadyNodeNames) != 0 {
return nil, []error{errors.Errorf("the status of nodes: %s is not 'Ready'", notReadyNodeNames)}
}
return nil, nil
}

// NodeEdgeWorkerLabelCheck checks whether the node contains edgeWorker label which represents an OpenYurt node.
type NodeEdgeWorkerLabelCheck struct {
NodeLst *v1.NodeList
}

func (NodeEdgeWorkerLabelCheck) Name() string {
return "NodeEdgeWorkerLabel"
}

func (nlc NodeEdgeWorkerLabelCheck) Check() (warnings, errorList []error) {
klog.V(1).Infoln("validating node edgeworker label")
var hasLabelNodeNames []string
for _, node := range nlc.NodeLst.Items {
if _, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()]; ok {
hasLabelNodeNames = append(hasLabelNodeNames, node.Name)
}
}
if len(hasLabelNodeNames) != 0 {
return nil, []error{errors.Errorf("the nodes %s has already been labeled as a OpenYurt node", hasLabelNodeNames)}
}
return nil, nil
}

// JobExistCheck checks whether the jobs with a specific prefix exist.
type JobExistCheck struct {
JobLst *batchv1.JobList
Prefix string
Label string
}

func (jc JobExistCheck) Name() string {
if jc.Label != "" {
return jc.Label
}
return fmt.Sprintf("JobExist-%s", jc.Prefix)
}

func (jc JobExistCheck) Check() (warnings, errorList []error) {
klog.V(1).Infoln("validating convert jobs")
var invalidJobNames []string
for _, job := range jc.JobLst.Items {
if strings.HasPrefix(job.Name, jc.Prefix) {
invalidJobNames = append(invalidJobNames, job.Name)
}
}
if len(invalidJobNames) != 0 {
return nil, []error{errors.Errorf("jobs %s has prefix %s, may conflict with the conversion job name", invalidJobNames, jc.Prefix)}
}
return nil, nil
}

// NodeServantJobCheck create jobs to do preflight checks.
// After the job is successfully executed, it will be deleted.
// The failed job will not be deleted, and the user needs to delete it manually.
type NodeServantJobCheck struct {
cliSet *kubernetes.Clientset
jobLst []*batchv1.Job

waitServantJobTimeout time.Duration
checkServantJobPeriod time.Duration
}

func (NodeServantJobCheck) Name() string {
return "NodeServantJob"
}

func (nc NodeServantJobCheck) Check() (warnings []error, errorList []error) {
var wg sync.WaitGroup

res := make(chan error, len(nc.jobLst))
for _, job := range nc.jobLst {
wg.Add(1)
entity := *job
go func() {
defer wg.Done()
if err := kubeutil.RunJobAndCleanup(nc.cliSet, &entity,
nc.waitServantJobTimeout, nc.checkServantJobPeriod); err != nil {
msg := fmt.Errorf("fail to run servant job(%s): %s\n", entity.GetName(), err)
res <- msg
} else {
klog.V(1).Infof("servant job(%s) has succeeded\n", entity.GetName())
}
}()
}
wg.Wait()
close(res)
for m := range res {
errorList = append(errorList, m)
}
return nil, errorList
}

// FileExistingCheck checks that the given file already exist.
type FileExistingCheck struct {
Path string
Expand Down Expand Up @@ -227,6 +351,40 @@ func (ipc ImagePullCheck) Check() (warnings, errorList []error) {
return warnings, errorList
}

// RunConvertClusterChecks excutes all cluster-level checks.
func RunConvertClusterChecks(cliSet *kubernetes.Clientset, ignorePreflightErrors sets.String) error {
nodeLst, err := cliSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}

jobLst, err := cliSet.BatchV1().Jobs("kube-system").List(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}

checks := []Checker{
NodeReadyCheck{NodeLst: nodeLst},
NodeEdgeWorkerLabelCheck{NodeLst: nodeLst},
JobExistCheck{JobLst: jobLst, Prefix: kubeutil.DisableNodeControllerJobNameBase, Label: "DisableNodeController"},
JobExistCheck{JobLst: jobLst, Prefix: nodeservant.ConvertJobNameBase, Label: "NodeServantConvert"},
JobExistCheck{JobLst: jobLst, Prefix: nodeservant.ConvertPreflightJobNameBase, Label: "NodeServantPreflightCheck"},
}
return RunChecks(checks, os.Stderr, ignorePreflightErrors)
}

// RunNodeServantJobCheck runs preflight-check job for each node.
func RunNodeServantJobCheck(
cliSet *kubernetes.Clientset, jobLst []*batchv1.Job,
waitServantJobTimeout time.Duration, checkServantJobPeriod time.Duration,
ignorePreflightErrors sets.String) error {

checks := []Checker{
NodeServantJobCheck{cliSet: cliSet, jobLst: jobLst, waitServantJobTimeout: waitServantJobTimeout, checkServantJobPeriod: checkServantJobPeriod},
}
return RunChecks(checks, os.Stderr, ignorePreflightErrors)
}

func RunConvertNodeChecks(o KubePathOperator, ignorePreflightErrors sets.String, deployTunnel bool) error {
// First, check if we're root separately from the other preflight-convert-convert checks and fail fast
if err := RunRootCheckOnly(ignorePreflightErrors); err != nil {
Expand Down Expand Up @@ -307,3 +465,22 @@ func setHasItemOrAll(s sets.String, item string) bool {
}
return false
}

func isNodeReady(status *v1.NodeStatus) bool {
_, condition := getNodeCondition(status, v1.NodeReady)
return condition != nil && condition.Status == v1.ConditionTrue
}

// getNodeCondition extracts the provided condition from the given status and returns that.
// Returns nil and -1 if the condition is not present, and the index of the located condition.
func getNodeCondition(status *v1.NodeStatus, conditionType v1.NodeConditionType) (int, *v1.NodeCondition) {
if status == nil {
return -1, nil
}
for i := range status.Conditions {
if status.Conditions[i].Type == conditionType {
return i, &status.Conditions[i]
}
}
return -1, nil
}
4 changes: 3 additions & 1 deletion pkg/preflight/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.

package preflight

import v1 "k8s.io/api/core/v1"
import (
v1 "k8s.io/api/core/v1"
)

type ImageOperator interface {
GetCRISocket() string
Expand Down
Loading

0 comments on commit 4455011

Please sign in to comment.