Skip to content

Commit

Permalink
balance leader after restored cluster status ready (#504)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaByte875 authored May 28, 2024
1 parent b43e203 commit a02c996
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/component/storaged_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep
return err
}
if isPodPending(pod) {
klog.Infof("skip host for pod [%s/%s] status is Pending", pod.Namespace, pod.Name)
klog.Infof("skip host for pod [%s/%s] status is Pending", pod.Namespace, pod.Name)
continue
}
host := nc.StoragedComponent().GetPodFQDN(i)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/component/storaged_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

nebulago "github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/meta"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (s *storagedUpdater) Update(
oldUnstruct, newUnstruct *unstructured.Unstructured,
gvk schema.GroupVersionKind,
) error {
if *nc.Spec.Storaged.Replicas == int32(0) {
if pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) == 0 {
return nil
}

Expand Down Expand Up @@ -148,7 +149,7 @@ func (s *storagedUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32)
}
empty := len(spaces) == 0

if empty || *nc.Spec.Storaged.Replicas < 3 || nc.IsForceUpdateEnabled() {
if empty || pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) < 3 || nc.IsForceUpdateEnabled() {
return s.clientSet.Pod().DeletePod(namespace, updatePodName, false)
}

Expand Down Expand Up @@ -235,7 +236,7 @@ func (s *storagedUpdater) updateStoragedPod(
return err
}

if empty || *nc.Spec.Storaged.Replicas < 3 || nc.IsForceUpdateEnabled() {
if empty || pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) < 3 || nc.IsForceUpdateEnabled() {
return setPartition(newUnstruct, int64(ordinal), advanced)
}

Expand Down Expand Up @@ -428,7 +429,7 @@ func (s *storagedUpdater) transLeader(
}

func (s *storagedUpdater) updateRunningPhase(mc nebula.MetaInterface, nc *v1alpha1.NebulaCluster, spaces []*meta.IdName) error {
if len(spaces) == 0 || *nc.Spec.Storaged.Replicas == 1 {
if len(spaces) == 0 || pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) == 1 {
nc.Status.Storaged.Phase = v1alpha1.RunningPhase
return nil
}
Expand Down
68 changes: 62 additions & 6 deletions pkg/controller/nebularestore/nebula_restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,37 @@ func (rm *restoreManager) syncRestoreProcess(nr *v1alpha1.NebulaRestore) error {
return err
}

options, err := nebula.ClientOptions(original, nebula.SetIsMeta(true))
if err != nil {
return err
}

restored := rm.genNebulaCluster(restoredName, nr, original)

nc, _ := rm.clientSet.NebulaCluster().GetNebulaCluster(ns, restoredName)
if nc != nil && annotation.IsInRestoreStage2(nc.Annotations) {
if !nc.IsReady() {
return utilerrors.ReconcileErrorf("restoring [%s/%s] in stage2, waiting for cluster ready", ns, restoredName)
}

mc, err := nebula.NewMetaClient([]string{restored.GetMetadThriftConnAddress()}, options...)
if err != nil {
return err
}
defer func() {
if err := mc.Disconnect(); err != nil {
klog.Errorf("meta client disconnect failed: %v", err)
}
}()

if err := balanceLeader(mc, nc); err != nil {
return err
}

nc.Annotations = nil
nc.Spec.Storaged.EnableForceUpdate = nil
nc.Spec.EnableAutoFailover = original.Spec.EnableAutoFailover
nc.Status.Storaged.BalancedSpaces = nil
if err := rm.clientSet.NebulaCluster().UpdateNebulaCluster(nc); err != nil {
return fmt.Errorf("remove cluster [%s/%s] annotations failed: %v", ns, restoredName, err)
}
Expand All @@ -138,12 +161,6 @@ func (rm *restoreManager) syncRestoreProcess(nr *v1alpha1.NebulaRestore) error {
})
}

options, err := nebula.ClientOptions(original, nebula.SetIsMeta(true))
if err != nil {
return err
}

restored := rm.genNebulaCluster(restoredName, nr, original)
if err := rm.clientSet.NebulaCluster().CreateNebulaCluster(restored); err != nil {
return err
}
Expand Down Expand Up @@ -303,6 +320,11 @@ func (rm *restoreManager) loadCluster(original, restored *v1alpha1.NebulaCluster
if err != nil {
return err
}
defer func() {
if err := mc.Disconnect(); err != nil {
klog.Errorf("meta client disconnect failed: %v", err)
}
}()

resp, err := mc.ListCluster()
if err != nil {
Expand Down Expand Up @@ -834,3 +856,37 @@ func getPodTerminateReason(pod corev1.Pod) string {
}
return ""
}

func balanceLeader(mc nebula.MetaInterface, nc *v1alpha1.NebulaCluster) error {
spaces, err := mc.ListSpaces()
if err != nil {
return err
}
if len(spaces) == 0 || pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) == 1 {
return nil
}

if nc.Status.Storaged.BalancedSpaces == nil {
nc.Status.Storaged.BalancedSpaces = make([]int32, 0, len(spaces))
}

contains := func(ss []int32, lookingFor int32) bool {
for _, s := range ss {
if lookingFor == s {
return true
}
}
return false
}

for _, space := range spaces {
if contains(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) {
continue
}
if err := mc.BalanceLeader(*space.Id.SpaceID); err != nil {
return err
}
nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID)
}
return nil
}

0 comments on commit a02c996

Please sign in to comment.