From ab91f810f14995c88d7db7107412d0ade366d076 Mon Sep 17 00:00:00 2001 From: MegaByte875 Date: Thu, 2 Nov 2023 10:23:11 +0800 Subject: [PATCH] restart specified component or pod --- apis/pkg/annotation/annotation.go | 7 ++- pkg/controller/component/console.go | 2 +- pkg/controller/component/graphd_cluster.go | 15 ++++- pkg/controller/component/graphd_updater.go | 4 ++ pkg/controller/component/helper.go | 2 +- pkg/controller/component/interface.go | 3 + pkg/controller/component/metad_cluster.go | 11 ++++ pkg/controller/component/metad_updater.go | 4 ++ pkg/controller/component/storaged_cluster.go | 23 ++++++++ pkg/controller/component/storaged_updater.go | 58 ++++++++++++++++++++ pkg/util/extender/unstructured.go | 15 +++++ 11 files changed, 138 insertions(+), 6 deletions(-) diff --git a/apis/pkg/annotation/annotation.go b/apis/pkg/annotation/annotation.go index ab9fe52e..c32a84e2 100644 --- a/apis/pkg/annotation/annotation.go +++ b/apis/pkg/annotation/annotation.go @@ -34,13 +34,16 @@ const ( AnnLastAppliedStaticFlagsKey = "nebula-graph.io/last-applied-static-flags" // AnnLastAppliedConfigKey is annotation key to indicate the last applied configuration AnnLastAppliedConfigKey = "nebula-graph.io/last-applied-configuration" - // AnnPodSchedulingKey is pod scheduling annotation key, it represents whether the pod is scheduling - AnnPodSchedulingKey = "nebula-graph.io/pod-scheduling" // AnnPodConfigMapHash is pod configmap hash key to update configuration dynamically AnnPodConfigMapHash = "nebula-graph.io/cm-hash" // AnnPvReclaimKey is annotation key that indicate whether reclaim persistent volume AnnPvReclaimKey = "nebula-graph.io/enable-pv-reclaim" + // AnnRestartTimestamp is annotation key to indicate the timestamp that operator restart the workload + AnnRestartTimestamp = "nebula-graph.io/restart-timestamp" + // AnnRestartPodOrdinal is the annotation key to indicate which Pod will be restarted + AnnRestartPodOrdinal = "nebula-graph.io/restart-ordinal" + // AnnRestoreNameKey is restore name annotation key used for creating new nebula cluster with backup data AnnRestoreNameKey = "nebula-graph.io/restore-name" // AnnRestoreMetadStepKey is the annotation key to control Metad reconcile process diff --git a/pkg/controller/component/console.go b/pkg/controller/component/console.go index cac8893f..075b906f 100644 --- a/pkg/controller/component/console.go +++ b/pkg/controller/component/console.go @@ -68,7 +68,7 @@ func (c *nebulaConsole) syncConsolePod(nc *v1alpha1.NebulaCluster) error { } return c.clientSet.Pod().CreatePod(newPod) } - return updatePod(c.clientSet, newPod, oldPod) + return updateSinglePod(c.clientSet, newPod, oldPod) } func (c *nebulaConsole) generatePod(nc *v1alpha1.NebulaCluster) *corev1.Pod { diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index 79f8b018..1176e2fc 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -110,6 +110,15 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { return err } + // TODO: validate the timestamp format + timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + if err := extender.SetTemplateAnnotations(newWorkload, + map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { + return err + } + } + if err := c.syncNebulaClusterStatus(nc, newWorkload, oldWorkload); err != nil { return fmt.Errorf("sync graphd cluster status failed: %v", err) } @@ -120,6 +129,9 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { return err } } + if err := extender.SetRestartTimestamp(newWorkload); err != nil { + return err + } if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil { return err } @@ -187,8 +199,7 @@ func (c *graphdCluster) syncNebulaClusterStatus( lastReplicas = int32(v) } - if updating && - nc.Status.Metad.Phase != v1alpha1.UpdatePhase { + if updating && nc.Status.Metad.Phase != v1alpha1.UpdatePhase { nc.Status.Graphd.Phase = v1alpha1.UpdatePhase } else if *newReplicas < *oldReplicas || (ok && *newReplicas < lastReplicas) { nc.Status.Graphd.Phase = v1alpha1.ScaleInPhase diff --git a/pkg/controller/component/graphd_updater.go b/pkg/controller/component/graphd_updater.go index 1ab07629..e98f3a86 100644 --- a/pkg/controller/component/graphd_updater.go +++ b/pkg/controller/component/graphd_updater.go @@ -81,6 +81,10 @@ func (g *graphUpdater) Update( return nil } +func (g *graphUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error { + return nil +} + func (g *graphUpdater) updateGraphdPod(ordinal int32, newUnstruct *unstructured.Unstructured, advanced bool) error { return setPartition(newUnstruct, int64(ordinal), advanced) } diff --git a/pkg/controller/component/helper.go b/pkg/controller/component/helper.go index 6a2ddda0..a0560604 100644 --- a/pkg/controller/component/helper.go +++ b/pkg/controller/component/helper.go @@ -475,7 +475,7 @@ func podEqual(newPod, oldPod *corev1.Pod) bool { return false } -func updatePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error { +func updateSinglePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error { isOrphan := metav1.GetControllerOf(oldPod) == nil if podEqual(newPod, oldPod) && !isOrphan { return nil diff --git a/pkg/controller/component/interface.go b/pkg/controller/component/interface.go index 07e94f34..fef5b0da 100644 --- a/pkg/controller/component/interface.go +++ b/pkg/controller/component/interface.go @@ -40,4 +40,7 @@ type ScaleManager interface { type UpdateManager interface { // Update updates the cluster Update(nc *v1alpha1.NebulaCluster, old, new *unstructured.Unstructured, gvk schema.GroupVersionKind) error + + // RestartPod restart the specified Pod + RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error } diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index 0020de7b..1e300881 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -118,11 +118,22 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { return err } + timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + if err := extender.SetTemplateAnnotations(newWorkload, + map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { + return err + } + } + if err := c.syncNebulaClusterStatus(nc, oldWorkload); err != nil { return fmt.Errorf("sync metad cluster status failed: %v", err) } if notExist { + if err := extender.SetRestartTimestamp(newWorkload); err != nil { + return err + } if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil { return err } diff --git a/pkg/controller/component/metad_updater.go b/pkg/controller/component/metad_updater.go index 3cc0e06f..a476c699 100644 --- a/pkg/controller/component/metad_updater.go +++ b/pkg/controller/component/metad_updater.go @@ -76,6 +76,10 @@ func (m *metadUpdater) Update( return nil } +func (m *metadUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error { + return nil +} + func (m *metadUpdater) updateMetadPod(ordinal int32, newUnstruct *unstructured.Unstructured, advanced bool) error { return setPartition(newUnstruct, int64(ordinal), advanced) } diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index ebb838b4..be9c9e3f 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -18,6 +18,7 @@ package component import ( "fmt" + "strconv" "github.com/vesoft-inc/nebula-go/v3/nebula/meta" corev1 "k8s.io/api/core/v1" @@ -123,6 +124,14 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error return err } + timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + if err := extender.SetTemplateAnnotations(newWorkload, + map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { + return err + } + } + if err := c.syncNebulaClusterStatus(nc, oldWorkload); err != nil { return fmt.Errorf("sync storaged cluster status failed: %v", err) } @@ -133,6 +142,9 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error return err } } + if err := extender.SetRestartTimestamp(newWorkload); err != nil { + return err + } if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil { return err } @@ -186,6 +198,17 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error } } + oVal, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartPodOrdinal] + if ok { + ordinal, err := strconv.Atoi(oVal) + if err != nil { + return err + } + if err := c.updateManager.RestartPod(nc, int32(ordinal)); err != nil { + return err + } + } + if err := c.syncStoragedPVC(nc); err != nil { return err } diff --git a/pkg/controller/component/storaged_updater.go b/pkg/controller/component/storaged_updater.go index 01ca6482..44065964 100644 --- a/pkg/controller/component/storaged_updater.go +++ b/pkg/controller/component/storaged_updater.go @@ -126,6 +126,64 @@ func (s *storagedUpdater) Update( return s.updateRunningPhase(mc, nc, spaces) } +func (s *storagedUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error { + namespace := nc.GetNamespace() + updatePodName := nc.StoragedComponent().GetPodName(ordinal) + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + endpoints := []string{nc.GetMetadThriftConnAddress()} + mc, err := nebula.NewMetaClient(endpoints, options...) + if err != nil { + return err + } + defer func() { + if err := mc.Disconnect(); err != nil { + klog.Errorf("meta client disconnect failed: %v", err) + } + }() + + spaces, err := mc.ListSpaces() + if err != nil { + return err + } + empty := len(spaces) == 0 + + if empty || nc.IsForceUpdateEnabled() { + return s.clientSet.Pod().DeletePod(namespace, updatePodName) + } + + updatePod, err := s.clientSet.Pod().GetPod(namespace, updatePodName) + if err != nil { + klog.Errorf("get pod failed: %v", namespace, updatePodName, err) + return err + } + _, ok := updatePod.Annotations[TransLeaderBeginTime] + if !ok { + if updatePod.Annotations == nil { + updatePod.Annotations = make(map[string]string, 0) + } + now := time.Now().Format(time.RFC3339) + updatePod.Annotations[TransLeaderBeginTime] = now + if err := s.clientSet.Pod().UpdatePod(updatePod); err != nil { + return err + } + klog.Infof("set pod %s annotation %v successfully", updatePod.Name, TransLeaderBeginTime) + } + + host := nc.StoragedComponent().GetPodFQDN(ordinal) + if s.readyToUpdate(mc, host, updatePod) { + return s.clientSet.Pod().DeletePod(namespace, updatePodName) + } + + if err := s.transLeaderIfNecessary(nc, mc, ordinal); err != nil { + return &utilerrors.ReconcileError{Msg: fmt.Sprintf("%v", err)} + } + + return &utilerrors.ReconcileError{Msg: fmt.Sprintf("storaged pod %s is transferring leader", updatePodName)} +} + // nolint: revive func (s *storagedUpdater) updateStoragedPod( mc nebula.MetaInterface, diff --git a/pkg/util/extender/unstructured.go b/pkg/util/extender/unstructured.go index 90695ecc..7d8a5104 100644 --- a/pkg/util/extender/unstructured.go +++ b/pkg/util/extender/unstructured.go @@ -20,6 +20,7 @@ import ( "encoding/json" "strconv" "strings" + "time" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -223,6 +224,10 @@ func UpdateWorkload( if ok { annotations[annotation.AnnLastReplicas] = r } + t, ok := oldUnstruct.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + annotations[annotation.AnnRestartTimestamp] = t + } w.SetAnnotations(annotations) var updateStrategy interface{} newSpec := GetSpec(newUnstruct) @@ -290,6 +295,16 @@ func SetLastAppliedConfigAnnotation(obj *unstructured.Unstructured) error { return nil } +func SetRestartTimestamp(obj *unstructured.Unstructured) error { + annotations := make(map[string]string) + for k, v := range obj.GetAnnotations() { + annotations[k] = v + } + annotations[annotation.AnnRestartTimestamp] = time.Now().Format(time.RFC3339) + obj.SetAnnotations(annotations) + return nil +} + func SetUpdatePartition( obj *unstructured.Unstructured, upgradeOrdinal,