Skip to content

Commit

Permalink
verify static pod status
Browse files Browse the repository at this point in the history
Signed-off-by: hxcGit <[email protected]>
  • Loading branch information
xavier-hou committed Apr 7, 2023
1 parent b128664 commit f391d0d
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 19 deletions.
21 changes: 14 additions & 7 deletions cmd/yurt-node-servant/static-pod-upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package upgrade

import (
"fmt"
"github.com/spf13/pflag"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/klog/v2"

upgrade "github.com/openyurtio/openyurt/pkg/static-pod-upgrade"
upgrade "github.com/openyurtio/openyurt/pkg/node-servant/static-pod-upgrade"
)

var (
manifest string
mode string
name string
namespace string
manifest string
hash string
mode string
)

// NewUpgradeCmd generates a new upgrade command
Expand All @@ -45,7 +48,7 @@ func NewUpgradeCmd() *cobra.Command {
klog.Fatalf("Fail to validate static pod upgrade args, %v", err)
}

ctrl, err := upgrade.New(manifest, mode)
ctrl, err := upgrade.New(name, namespace, manifest, hash, mode)
if err != nil {
klog.Fatalf("Fail to create static-pod-upgrade controller, %v", err)
}
Expand All @@ -64,14 +67,18 @@ func NewUpgradeCmd() *cobra.Command {
}

func addFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&name, "name", "", "The name of static pod which needs be upgraded")
cmd.Flags().StringVar(&namespace, "namespace", "", "The namespace of static pod which needs be upgraded")
cmd.Flags().StringVar(&manifest, "manifest", "", "The manifest file name of static pod which needs be upgraded")
cmd.Flags().StringVar(&hash, "hash", "", "The hash value of new static pod specification")
cmd.Flags().StringVar(&mode, "mode", "", "The upgrade mode which is used")
}

// Validate check if all the required arguments are valid
func validate() error {
if manifest == "" || mode == "" {
return fmt.Errorf("args can not be empty, manifest is %s, mode is %s", manifest, mode)
if name == "" || namespace == "" || manifest == "" || hash == "" || mode == "" {
return fmt.Errorf("args can not be empty, name is %s, namespace is %s,manifest is %s, hash is %s,mode is %s",
name, namespace, manifest, hash, mode)
}

// TODO: use constant value of static-pod controller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/node-servant/static-pod-upgrade/util"
)

const (
DefaultUpgradeDir = "openyurtio-upgrade"
DefaultUpgradeDir = "openyurtio-upgrade"
DefaultStaticPodRunningCheckTimeout = 2 * time.Minute

// TODO: use constant value of static-pod controller
OTA = "ota"
Expand All @@ -38,8 +42,14 @@ var (
)

type Controller struct {
// Name of static pod
name string
// Namespace of static pod
namespace string
// Manifest file name of static pod
manifest string
// The latest static pod hash
hash string
// Only support `OTA` and `Auto`
upgradeMode string

Expand All @@ -53,16 +63,19 @@ type Controller struct {
upgradeManifestPath string
}

func New(manifest, mode string) (*Controller, error) {
func New(name, namespace, manifest, hash, mode string) (*Controller, error) {
ctrl := &Controller{
name: name,
namespace: namespace,
manifest: manifest,
hash: hash,
upgradeMode: mode,
}

ctrl.manifestPath = filepath.Join(DefaultManifestPath, WithYamlSuffix(ctrl.manifest))
ctrl.bakManifestPath = filepath.Join(DefaultManifestPath, DefaultUpgradeDir, WithBackupSuffix(ctrl.manifest))
ctrl.manifestPath = filepath.Join(DefaultManifestPath, util.WithYamlSuffix(ctrl.manifest))
ctrl.bakManifestPath = filepath.Join(DefaultManifestPath, DefaultUpgradeDir, util.WithBackupSuffix(ctrl.manifest))
ctrl.configMapDataPath = filepath.Join(DefaultConfigmapPath, ctrl.manifest)
ctrl.upgradeManifestPath = filepath.Join(DefaultManifestPath, DefaultUpgradeDir, WithUpgradeSuffix(ctrl.manifest))
ctrl.upgradeManifestPath = filepath.Join(DefaultManifestPath, DefaultUpgradeDir, util.WithUpgradeSuffix(ctrl.manifest))

return ctrl, nil
}
Expand Down Expand Up @@ -102,6 +115,16 @@ func (ctrl *Controller) AutoUpgrade() error {
}
klog.Info("Auto upgrade replaceManifest success")

// (3) Verify the new static pod is running
ok, err := ctrl.verify()
if err != nil {
return err
}
if !ok {
return fmt.Errorf("the latest static pod is not running")
}
klog.Info("Auto upgrade verify success")

return nil
}

Expand Down Expand Up @@ -129,15 +152,21 @@ func (ctrl *Controller) prepareManifest() error {
}
}

return CopyFile(ctrl.configMapDataPath, ctrl.upgradeManifestPath)
return util.CopyFile(ctrl.configMapDataPath, ctrl.upgradeManifestPath)
}

// backUpManifest backup the old manifest in order to roll back when errors occur
func (ctrl *Controller) backupManifest() error {
return CopyFile(ctrl.manifestPath, ctrl.bakManifestPath)
return util.CopyFile(ctrl.manifestPath, ctrl.bakManifestPath)
}

// replaceManifest replace old manifest with the latest one, it achieves static pod upgrade
func (ctrl *Controller) replaceManifest() error {
return CopyFile(ctrl.upgradeManifestPath, ctrl.manifestPath)
return util.CopyFile(ctrl.upgradeManifestPath, ctrl.manifestPath)
}

// verify make sure the latest static pod is running
// return false when the latest static pod failed or check status time out
func (ctrl *Controller) verify() (bool, error) {
return util.WaitForPodRunning(ctrl.namespace, ctrl.name, ctrl.hash, DefaultStaticPodRunningCheckTimeout)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package upgrade
import (
"os"
"path/filepath"
"strings"
"testing"

corev1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

upgradeUtil "github.com/openyurtio/openyurt/pkg/node-servant/static-pod-upgrade/util"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand All @@ -40,7 +42,7 @@ func Test(t *testing.T) {
// Temporarily modify the manifest path in order to test
DefaultManifestPath = t.TempDir()
DefaultConfigmapPath = t.TempDir()
_, _ = os.Create(filepath.Join(DefaultManifestPath, WithYamlSuffix(TestManifest)))
_, _ = os.Create(filepath.Join(DefaultManifestPath, upgradeUtil.WithYamlSuffix(TestManifest)))
_, _ = os.Create(filepath.Join(DefaultConfigmapPath, TestManifest))

runningStaticPod := &corev1.Pod{
Expand Down Expand Up @@ -70,13 +72,15 @@ func Test(t *testing.T) {
/*
2. Test
*/
ctrl, err := New(TestManifest, mode)
ctrl, err := New(TestPodName, metav1.NamespaceDefault, TestManifest, TestHashValue, mode)
if err != nil {
t.Errorf("Fail to get upgrade controller, %v", err)
}

if err := ctrl.Upgrade(); err != nil {
t.Errorf("Fail to upgrade, %v", err)
if strings.Contains(err.Error(), "fail to access yurthub pods API") {
t.Errorf("Fail to upgrade, %v", err)
}
}

/*
Expand Down
92 changes: 92 additions & 0 deletions pkg/node-servant/static-pod-upgrade/util/pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2023 The OpenYurt Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"fmt"
"io/ioutil"
"net/http"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
)

const (
YurtHubAddress = "http://127.0.0.1:10267"
YurtHubAPIPath = "/pods"
)

func GetPodFromYurtHub(namespace, name string) (*v1.Pod, error) {
podList, err := GetPodsFromYurtHub(YurtHubAddress + YurtHubAPIPath)
if err != nil {
return nil, err
}

for i, pod := range podList.Items {
if pod.Namespace == namespace && pod.Name == name {
return &podList.Items[i], nil
}
}

return nil, fmt.Errorf("fail to find pod %s/%s", namespace, name)
}

func GetPodsFromYurtHub(url string) (*v1.PodList, error) {
data, err := getPodsDataFromYurtHub(url)
if err != nil {
return nil, err
}

podList, err := decodePods(data)
if err != nil {
return nil, err
}

return podList, nil
}

func getPodsDataFromYurtHub(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("fail to access yurthub pods API, returned status: %v", resp.Status)
}

data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

return data, nil
}

func decodePods(data []byte) (*v1.PodList, error) {
codecFactory := serializer.NewCodecFactory(runtime.NewScheme())
codec := codecFactory.LegacyCodec(schema.GroupVersion{Group: v1.GroupName, Version: "v1"})

podList := new(v1.PodList)
if _, _, err := codec.Decode(data, nil, podList); err != nil {
return nil, fmt.Errorf("failed to decode pod list: %s", err)
}
return podList, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package upgrade
package util

import (
"context"
"fmt"
"io"
"os"
"strings"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

k8sutil "github.com/openyurtio/openyurt/pkg/controller/daemonpodupdater/kubernetes"
)

const (
Expand Down Expand Up @@ -61,3 +70,47 @@ func CopyFile(src, dst string) error {
}
return nil
}

// WaitForPodRunning waits static pod to run
// Success: Static pod annotation `StaticPodHashAnnotation` value equals to function argument hash
// Failed: Receive PodFailed event
func WaitForPodRunning(namespace, name, hash string, timeout time.Duration) (bool, error) {
klog.Infof("WaitForPodRuning name is %s, namespace is %s", namespace, name)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

checkPod := func(pod *v1.Pod) (hasResult, result bool) {
h := pod.Annotations[StaticPodHashAnnotation]
if k8sutil.IsPodReady(pod) && pod.Status.Phase == v1.PodRunning && h == hash {
return true, true
}

if pod.Status.Phase == v1.PodFailed {
return true, false
}

return false, false
}

for {
select {
case <-ctx.Done():
return false, fmt.Errorf("timeout waiting for static pod %s/%s to be running", namespace, name)
default:
pod, err := GetPodFromYurtHub(namespace, name)
if err != nil {
if !strings.Contains(err.Error(), "fail to find pod") {
return false, err
}
}
if pod != nil {
hasResult, result := checkPod(pod)
if hasResult {
return result, nil
}
}

time.Sleep(10 * time.Second)
}
}
}

0 comments on commit f391d0d

Please sign in to comment.