Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restart specified component or pod #385

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions apis/pkg/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ const (
AnnLastAppliedStaticFlagsKey = "nebula-graph.io/last-applied-static-flags"
// AnnLastAppliedConfigKey is annotation key to indicate the last applied configuration
AnnLastAppliedConfigKey = "nebula-graph.io/last-applied-configuration"
// AnnPodSchedulingKey is pod scheduling annotation key, it represents whether the pod is scheduling
AnnPodSchedulingKey = "nebula-graph.io/pod-scheduling"
// AnnPodConfigMapHash is pod configmap hash key to update configuration dynamically
AnnPodConfigMapHash = "nebula-graph.io/cm-hash"
// AnnPvReclaimKey is annotation key that indicate whether reclaim persistent volume
AnnPvReclaimKey = "nebula-graph.io/enable-pv-reclaim"

// AnnRestartTimestamp is annotation key to indicate the timestamp that operator restart the workload
AnnRestartTimestamp = "nebula-graph.io/restart-timestamp"
// AnnRestartPodOrdinal is the annotation key to indicate which Pod will be restarted
AnnRestartPodOrdinal = "nebula-graph.io/restart-ordinal"

// AnnRestoreNameKey is restore name annotation key used for creating new nebula cluster with backup data
AnnRestoreNameKey = "nebula-graph.io/restore-name"
// AnnRestoreMetadStepKey is the annotation key to control Metad reconcile process
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/component/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *nebulaConsole) syncConsolePod(nc *v1alpha1.NebulaCluster) error {
}
return c.clientSet.Pod().CreatePod(newPod)
}
return updatePod(c.clientSet, newPod, oldPod)
return updateSinglePod(c.clientSet, newPod, oldPod)
}

func (c *nebulaConsole) generatePod(nc *v1alpha1.NebulaCluster) *corev1.Pod {
Expand Down
15 changes: 13 additions & 2 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
return err
}

// TODO: validate the timestamp format
timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
if err := extender.SetTemplateAnnotations(newWorkload,
map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil {
return err
}
}

if err := c.syncNebulaClusterStatus(nc, newWorkload, oldWorkload); err != nil {
return fmt.Errorf("sync graphd cluster status failed: %v", err)
}
Expand All @@ -120,6 +129,9 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error {
return err
}
}
if err := extender.SetRestartTimestamp(newWorkload); err != nil {
return err
}
if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil {
return err
}
Expand Down Expand Up @@ -187,8 +199,7 @@ func (c *graphdCluster) syncNebulaClusterStatus(
lastReplicas = int32(v)
}

if updating &&
nc.Status.Metad.Phase != v1alpha1.UpdatePhase {
if updating && nc.Status.Metad.Phase != v1alpha1.UpdatePhase {
nc.Status.Graphd.Phase = v1alpha1.UpdatePhase
} else if *newReplicas < *oldReplicas || (ok && *newReplicas < lastReplicas) {
nc.Status.Graphd.Phase = v1alpha1.ScaleInPhase
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/component/graphd_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (g *graphUpdater) Update(
return nil
}

func (g *graphUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error {
return nil
}

func (g *graphUpdater) updateGraphdPod(ordinal int32, newUnstruct *unstructured.Unstructured, advanced bool) error {
return setPartition(newUnstruct, int64(ordinal), advanced)
}
2 changes: 1 addition & 1 deletion pkg/controller/component/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func podEqual(newPod, oldPod *corev1.Pod) bool {
return false
}

func updatePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error {
func updateSinglePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error {
isOrphan := metav1.GetControllerOf(oldPod) == nil
if podEqual(newPod, oldPod) && !isOrphan {
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/component/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,7 @@ type ScaleManager interface {
type UpdateManager interface {
// Update updates the cluster
Update(nc *v1alpha1.NebulaCluster, old, new *unstructured.Unstructured, gvk schema.GroupVersionKind) error

// RestartPod restart the specified Pod
RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error
}
11 changes: 11 additions & 0 deletions pkg/controller/component/metad_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,22 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error {
return err
}

timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
if err := extender.SetTemplateAnnotations(newWorkload,
map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil {
return err
}
}

if err := c.syncNebulaClusterStatus(nc, oldWorkload); err != nil {
return fmt.Errorf("sync metad cluster status failed: %v", err)
}

if notExist {
if err := extender.SetRestartTimestamp(newWorkload); err != nil {
return err
}
if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/component/metad_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (m *metadUpdater) Update(
return nil
}

func (m *metadUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error {
return nil
}

func (m *metadUpdater) updateMetadPod(ordinal int32, newUnstruct *unstructured.Unstructured, advanced bool) error {
return setPartition(newUnstruct, int64(ordinal), advanced)
}
23 changes: 23 additions & 0 deletions pkg/controller/component/storaged_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package component

import (
"fmt"
"strconv"

"github.com/vesoft-inc/nebula-go/v3/nebula/meta"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -123,6 +124,14 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
return err
}

timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
if err := extender.SetTemplateAnnotations(newWorkload,
map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil {
return err
}
}

if err := c.syncNebulaClusterStatus(nc, oldWorkload); err != nil {
return fmt.Errorf("sync storaged cluster status failed: %v", err)
}
Expand All @@ -133,6 +142,9 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
return err
}
}
if err := extender.SetRestartTimestamp(newWorkload); err != nil {
return err
}
if err := extender.SetLastAppliedConfigAnnotation(newWorkload); err != nil {
return err
}
Expand Down Expand Up @@ -186,6 +198,17 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error
}
}

oVal, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartPodOrdinal]
if ok {
ordinal, err := strconv.Atoi(oVal)
if err != nil {
return err
}
if err := c.updateManager.RestartPod(nc, int32(ordinal)); err != nil {
return err
}
}

if err := c.syncStoragedPVC(nc); err != nil {
return err
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/component/storaged_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,64 @@ func (s *storagedUpdater) Update(
return s.updateRunningPhase(mc, nc, spaces)
}

func (s *storagedUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error {
namespace := nc.GetNamespace()
updatePodName := nc.StoragedComponent().GetPodName(ordinal)
options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true))
if err != nil {
return err
}
endpoints := []string{nc.GetMetadThriftConnAddress()}
mc, err := nebula.NewMetaClient(endpoints, options...)
if err != nil {
return err
}
defer func() {
if err := mc.Disconnect(); err != nil {
klog.Errorf("meta client disconnect failed: %v", err)
}
}()

spaces, err := mc.ListSpaces()
if err != nil {
return err
}
empty := len(spaces) == 0

if empty || nc.IsForceUpdateEnabled() {
return s.clientSet.Pod().DeletePod(namespace, updatePodName)
}

updatePod, err := s.clientSet.Pod().GetPod(namespace, updatePodName)
if err != nil {
klog.Errorf("get pod failed: %v", namespace, updatePodName, err)
return err
}
_, ok := updatePod.Annotations[TransLeaderBeginTime]
if !ok {
if updatePod.Annotations == nil {
updatePod.Annotations = make(map[string]string, 0)
}
now := time.Now().Format(time.RFC3339)
updatePod.Annotations[TransLeaderBeginTime] = now
if err := s.clientSet.Pod().UpdatePod(updatePod); err != nil {
return err
}
klog.Infof("set pod %s annotation %v successfully", updatePod.Name, TransLeaderBeginTime)
}

host := nc.StoragedComponent().GetPodFQDN(ordinal)
if s.readyToUpdate(mc, host, updatePod) {
return s.clientSet.Pod().DeletePod(namespace, updatePodName)
}

if err := s.transLeaderIfNecessary(nc, mc, ordinal); err != nil {
return &utilerrors.ReconcileError{Msg: fmt.Sprintf("%v", err)}
}

return &utilerrors.ReconcileError{Msg: fmt.Sprintf("storaged pod %s is transferring leader", updatePodName)}
}

// nolint: revive
func (s *storagedUpdater) updateStoragedPod(
mc nebula.MetaInterface,
Expand Down
15 changes: 15 additions & 0 deletions pkg/util/extender/unstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"strconv"
"strings"
"time"

apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -223,6 +224,10 @@ func UpdateWorkload(
if ok {
annotations[annotation.AnnLastReplicas] = r
}
t, ok := oldUnstruct.GetAnnotations()[annotation.AnnRestartTimestamp]
if ok {
annotations[annotation.AnnRestartTimestamp] = t
}
w.SetAnnotations(annotations)
var updateStrategy interface{}
newSpec := GetSpec(newUnstruct)
Expand Down Expand Up @@ -290,6 +295,16 @@ func SetLastAppliedConfigAnnotation(obj *unstructured.Unstructured) error {
return nil
}

func SetRestartTimestamp(obj *unstructured.Unstructured) error {
annotations := make(map[string]string)
for k, v := range obj.GetAnnotations() {
annotations[k] = v
}
annotations[annotation.AnnRestartTimestamp] = time.Now().Format(time.RFC3339)
obj.SetAnnotations(annotations)
return nil
}

func SetUpdatePartition(
obj *unstructured.Unstructured,
upgradeOrdinal,
Expand Down