Skip to content

Commit

Permalink
restart specified component or pod (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaByte875 authored Nov 2, 2023
1 parent 8e8eb61 commit e05cef3
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 6 deletions.
7 changes: 5 additions & 2 deletions apis/pkg/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ const (
AnnLastAppliedStaticFlagsKey = "nebula-graph.io/last-applied-static-flags"
// AnnLastAppliedConfigKey is annotation key to indicate the last applied configuration
AnnLastAppliedConfigKey = "nebula-graph.io/last-applied-configuration"
// AnnPodSchedulingKey is pod scheduling annotation key, it represents whether the pod is scheduling
AnnPodSchedulingKey = "nebula-graph.io/pod-scheduling"
// AnnPodConfigMapHash is pod configmap hash key to update configuration dynamically
AnnPodConfigMapHash = "nebula-graph.io/cm-hash"
// AnnPvReclaimKey is annotation key that indicate whether reclaim persistent volume
AnnPvReclaimKey = "nebula-graph.io/enable-pv-reclaim"

// AnnRestartTimestamp is annotation key to indicate the timestamp that operator restart the workload
AnnRestartTimestamp = "nebula-graph.io/restart-timestamp"
// AnnRestartPodOrdinal is the annotation key to indicate which Pod will be restarted
AnnRestartPodOrdinal = "nebula-graph.io/restart-ordinal"

// AnnRestoreNameKey is restore name annotation key used for creating new nebula cluster with backup data
AnnRestoreNameKey = "nebula-graph.io/restore-name"
// AnnRestoreMetadStepKey is the annotation key to control Metad reconcile process
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/component/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *nebulaConsole) syncConsolePod(nc *v1alpha1.NebulaCluster) error {
}
return c.clientSet.Pod().CreatePod(newPod)
}
return updatePod(c.clientSet, newPod, oldPod)
return updateSinglePod(c.clientSet, newPod, oldPod)
}

func (c *nebulaConsole) generatePod(nc *v1alpha1.NebulaCluster) *corev1.Pod {
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
return err
}

// TODO: validate the timestamp format
timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
if err := extender.SetTemplateAnnotations(newWorkload,
map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil {
return err
}
}

if err := c.syncNebulaClusterStatus(nc, newWorkload, oldWorkload); err != nil {
return fmt.Errorf("sync graphd cluster status failed: %v", err)
}
Expand All @@ -120,6 +129,9 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
return err
}
}
if err := extender.SetRestartTimestamp(newWorkload); err != nil {
return err
}
if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil {
return err
}
Expand Down Expand Up @@ -187,8 +199,7 @@ func (c *graphdCluster) syncNebulaClusterStatus(
lastReplicas = int32(v)
}

if updating &&
nc.Status.Metad.Phase != v1alpha1.UpdatePhase {
if updating && nc.Status.Metad.Phase != v1alpha1.UpdatePhase {
nc.Status.Graphd.Phase = v1alpha1.UpdatePhase
} else if *newReplicas < *oldReplicas || (ok && *newReplicas < lastReplicas) {
nc.Status.Graphd.Phase = v1alpha1.ScaleInPhase
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/component/graphd_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (g *graphUpdater) Update(
return nil
}

func (g *graphUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error {
return nil
}

func (g *graphUpdater) updateGraphdPod(ordinal int32, newUnstruct *unstructured.Unstructured, advanced bool) error {
return setPartition(newUnstruct, int64(ordinal), advanced)
}
2 changes: 1 addition & 1 deletion pkg/controller/component/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func podEqual(newPod, oldPod *corev1.Pod) bool {
return false
}

func updatePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error {
func updateSinglePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error {
isOrphan := metav1.GetControllerOf(oldPod) == nil
if podEqual(newPod, oldPod) && !isOrphan {
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/component/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ type ScaleManager interface {
type UpdateManager interface {
// Update updates the cluster
Update(nc *v1alpha1.NebulaCluster, old, new *unstructured.Unstructured, gvk schema.GroupVersionKind) error

// RestartPod restart the specified Pod
RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error
}
11 changes: 11 additions & 0 deletions pkg/controller/component/metad_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,22 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error {
return err
}

timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
if err := extender.SetTemplateAnnotations(newWorkload,
map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil {
return err
}
}

if err := c.syncNebulaClusterStatus(nc, oldWorkload); err != nil {
return fmt.Errorf("sync metad cluster status failed: %v", err)
}

if notExist {
if err := extender.SetRestartTimestamp(newWorkload); err != nil {
return err
}
if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/component/metad_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (m *metadUpdater) Update(
return nil
}

func (m *metadUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error {
return nil
}

func (m *metadUpdater) updateMetadPod(ordinal int32, newUnstruct *unstructured.Unstructured, advanced bool) error {
return setPartition(newUnstruct, int64(ordinal), advanced)
}
23 changes: 23 additions & 0 deletions pkg/controller/component/storaged_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package component

import (
"fmt"
"strconv"

"github.com/vesoft-inc/nebula-go/v3/nebula/meta"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -123,6 +124,14 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
return err
}

timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
if err := extender.SetTemplateAnnotations(newWorkload,
map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil {
return err
}
}

if err := c.syncNebulaClusterStatus(nc, oldWorkload); err != nil {
return fmt.Errorf("sync storaged cluster status failed: %v", err)
}
Expand All @@ -133,6 +142,9 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
return err
}
}
if err := extender.SetRestartTimestamp(newWorkload); err != nil {
return err
}
if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil {
return err
}
Expand Down Expand Up @@ -186,6 +198,17 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
}
}

oVal, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartPodOrdinal]
if ok {
ordinal, err := strconv.Atoi(oVal)
if err != nil {
return err
}
if err := c.updateManager.RestartPod(nc, int32(ordinal)); err != nil {
return err
}
}

if err := c.syncStoragedPVC(nc); err != nil {
return err
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/component/storaged_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,64 @@ func (s *storagedUpdater) Update(
return s.updateRunningPhase(mc, nc, spaces)
}

func (s *storagedUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error {
namespace := nc.GetNamespace()
updatePodName := nc.StoragedComponent().GetPodName(ordinal)
options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true))
if err != nil {
return err
}
endpoints := []string{nc.GetMetadThriftConnAddress()}
mc, err := nebula.NewMetaClient(endpoints, options...)
if err != nil {
return err
}
defer func() {
if err := mc.Disconnect(); err != nil {
klog.Errorf("meta client disconnect failed: %v", err)
}
}()

spaces, err := mc.ListSpaces()
if err != nil {
return err
}
empty := len(spaces) == 0

if empty || nc.IsForceUpdateEnabled() {
return s.clientSet.Pod().DeletePod(namespace, updatePodName)
}

updatePod, err := s.clientSet.Pod().GetPod(namespace, updatePodName)
if err != nil {
klog.Errorf("get pod failed: %v", namespace, updatePodName, err)
return err
}
_, ok := updatePod.Annotations[TransLeaderBeginTime]
if !ok {
if updatePod.Annotations == nil {
updatePod.Annotations = make(map[string]string, 0)
}
now := time.Now().Format(time.RFC3339)
updatePod.Annotations[TransLeaderBeginTime] = now
if err := s.clientSet.Pod().UpdatePod(updatePod); err != nil {
return err
}
klog.Infof("set pod %s annotation %v successfully", updatePod.Name, TransLeaderBeginTime)
}

host := nc.StoragedComponent().GetPodFQDN(ordinal)
if s.readyToUpdate(mc, host, updatePod) {
return s.clientSet.Pod().DeletePod(namespace, updatePodName)
}

if err := s.transLeaderIfNecessary(nc, mc, ordinal); err != nil {
return &utilerrors.ReconcileError{Msg: fmt.Sprintf("%v", err)}
}

return &utilerrors.ReconcileError{Msg: fmt.Sprintf("storaged pod %s is transferring leader", updatePodName)}
}

// nolint: revive
func (s *storagedUpdater) updateStoragedPod(
mc nebula.MetaInterface,
Expand Down
15 changes: 15 additions & 0 deletions pkg/util/extender/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"strconv"
"strings"
"time"

apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -223,6 +224,10 @@ func UpdateWorkload(
if ok {
annotations[annotation.AnnLastReplicas] = r
}
t, ok := oldUnstruct.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
annotations[annotation.AnnRestartTimestamp] = t
}
w.SetAnnotations(annotations)
var updateStrategy interface{}
newSpec := GetSpec(newUnstruct)
Expand Down Expand Up @@ -290,6 +295,16 @@ func SetLastAppliedConfigAnnotation(obj *unstructured.Unstructured) error {
return nil
}

func SetRestartTimestamp(obj *unstructured.Unstructured) error {
annotations := make(map[string]string)
for k, v := range obj.GetAnnotations() {
annotations[k] = v
}
annotations[annotation.AnnRestartTimestamp] = time.Now().Format(time.RFC3339)
obj.SetAnnotations(annotations)
return nil
}

func SetUpdatePartition(
obj *unstructured.Unstructured,
upgradeOrdinal,
Expand Down

0 comments on commit e05cef3

Please sign in to comment.