Skip to content

Commit

Permalink
[operator] Cherry pick auto-failover and leader balance commits (#534)
Browse files Browse the repository at this point in the history
* fix(operator): remove unnecessary pod from autofailover list (#530)

* [operator] run leader balance multiple times until balanced. (#532)

[operator] run leader balance mutiple times until balanced.
  • Loading branch information
kevinliu24 authored Nov 13, 2024
1 parent 9a97ee8 commit 96cf048
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 7 deletions.
8 changes: 6 additions & 2 deletions pkg/controller/component/graphd_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,12 +292,16 @@ func (c *graphdCluster) syncNebulaClusterStatus(
return err
}
thriftPort := nc.GraphdComponent().GetPort(v1alpha1.GraphdPortNameThrift)
for i := range hostItems {
host := hostItems[i]
klog.Infof("Current graphd state: %v. Current number of replicas: %v", nc.Status.Graphd.Phase, pointer.Int32Deref(newReplicas, 0))
for _, host := range hostItems {
klog.Infof("Currently looking at host: %v with status %v", strings.Split(host.HostAddr.Host, ".")[0], host.Status)
if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort {
podName := strings.Split(host.HostAddr.Host, ".")[0]
ordinal := getPodOrdinal(podName)
if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Graphd.Replicas, 0) {
klog.Infof("graphd pod [%s/%s] has already been terminated by the sts. Skipping failover and/or removing from auto failover list", nc.Namespace, podName)
// delete is a no-op if FailureHosts or podName is nil
delete(nc.Status.Graphd.FailureHosts, podName)
continue
}
if nc.Status.Graphd.FailureHosts == nil {
Expand Down
10 changes: 8 additions & 2 deletions pkg/controller/component/metad_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,16 @@ func (c *metadCluster) syncNebulaClusterStatus(nc *v1alpha1.NebulaCluster, oldWo
return err
}
thriftPort := nc.MetadComponent().GetPort(v1alpha1.MetadPortNameThrift)
for i := range hostItems {
host := hostItems[i]
for _, host := range hostItems {
if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort {
podName := strings.Split(host.HostAddr.Host, ".")[0]
ordinal := getPodOrdinal(podName)
if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Metad.Replicas, 0) {
klog.Infof("metad pod [%s/%s] has already been terminated by the sts. Skipping failover and/or removing from auto failover list", nc.Namespace, podName)
// delete is a no-op if FailureHosts or podName is nil
delete(nc.Status.Metad.FailureHosts, podName)
continue
}
if nc.Status.Metad.FailureHosts == nil {
nc.Status.Metad.FailureHosts = make(map[string]v1alpha1.FailureHost)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/component/storaged_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,14 @@ func (c *storagedCluster) syncNebulaClusterStatus(
return err
}
thriftPort := nc.StoragedComponent().GetPort(v1alpha1.StoragedPortNameThrift)
for i := range hostItems {
host := hostItems[i]
for _, host := range hostItems {
if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort {
podName := strings.Split(host.HostAddr.Host, ".")[0]
ordinal := getPodOrdinal(podName)
if int32(ordinal) >= pointer.Int32Deref(nc.Spec.Storaged.Replicas, 0) {
klog.Infof("storaged pod [%s/%s] has already been terminated by the sts. Skipping failover and/or removing from auto failover list", nc.Namespace, podName)
// delete is a no-op if FailureHosts or podName is nil
delete(nc.Status.Storaged.FailureHosts, podName)
continue
}
if nc.Status.Storaged.FailureHosts == nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/controller/component/storaged_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,20 @@ func (s *storagedUpdater) balanceLeader(mc nebula.MetaInterface, nc *v1alpha1.Ne
if time.Now().Before(lastBalancedTime.Add(BalanceLeaderInterval * time.Second)) {
return utilerrors.ReconcileErrorf("partition leader is balancing")
}

balanced, err := mc.IsLeaderBalanced(space.Name)
if err != nil {
return utilerrors.ReconcileErrorf("failed to check if the leader is balanced for space %s: %v", space.Name, err)
}

if balanced {
nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID)
continue
}

if err := mc.BalanceLeader(*space.Id.SpaceID); err != nil {
return err
}
nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID)
nc.Status.Storaged.LastBalancedTime = &metav1.Time{Time: time.Now()}
return utilerrors.ReconcileErrorf("space %d need to be synced", *space.Id.SpaceID)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/nebula/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (
GetSpaceLeaderHosts(space []byte) ([]string, error)
GetLeaderCount(leaderHost string) (int, error)
BalanceStatus(jobID int32, spaceID nebula.GraphSpaceID) error
IsLeaderBalanced(spaceName []byte) (bool, error)
BalanceLeader(spaceID nebula.GraphSpaceID) error
BalanceData(spaceID nebula.GraphSpaceID) (int32, error)
BalanceDataInZone(spaceID nebula.GraphSpaceID) (int32, error)
Expand Down Expand Up @@ -356,6 +357,24 @@ func (m *metaClient) BalanceLeader(spaceID nebula.GraphSpaceID) error {
return nil
}

func (m *metaClient) IsLeaderBalanced(spaceName []byte) (bool, error) {
hosts, err := m.ListHosts(meta.ListHostType_ALLOC)
if err != nil {
return false, err
}

for _, host := range hosts {
if host.Status != meta.HostStatus_ONLINE {
continue
}
if host.LeaderParts[(string)(spaceName)] == nil {
return false, nil
}
}

return true, nil
}

func (m *metaClient) runAdminJob(req *meta.AdminJobReq) (int32, error) {
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))
Expand Down

0 comments on commit 96cf048

Please sign in to comment.