diff --git a/apis/apps/v1alpha1/nebulacluster.go b/apis/apps/v1alpha1/nebulacluster.go index 345b3761..400fd4d0 100644 --- a/apis/apps/v1alpha1/nebulacluster.go +++ b/apis/apps/v1alpha1/nebulacluster.go @@ -110,6 +110,10 @@ func (nc *NebulaCluster) ConcurrentTransfer() bool { return pointer.BoolDeref(nc.Spec.Storaged.ConcurrentTransfer, false) } +func (nc *NebulaCluster) IsAutoFailoverEnabled() bool { + return pointer.BoolDeref(nc.Spec.EnableAutoFailover, false) +} + func (nc *NebulaCluster) IsBREnabled() bool { return pointer.BoolDeref(nc.Spec.EnableBR, false) } diff --git a/apis/apps/v1alpha1/nebulacluster_common.go b/apis/apps/v1alpha1/nebulacluster_common.go index beca338f..a777e42a 100644 --- a/apis/apps/v1alpha1/nebulacluster_common.go +++ b/apis/apps/v1alpha1/nebulacluster_common.go @@ -23,8 +23,7 @@ import ( "strconv" "strings" - kruisepub "github.com/openkruise/kruise-api/apps/pub" - kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -201,23 +200,23 @@ func rollingUpdateDone(workloadStatus *WorkloadStatus) bool { workloadStatus.CurrentRevision == workloadStatus.UpdateRevision } -func upgradeStatefulSet(sts *appsv1.StatefulSet) (*kruisev1alpha1.StatefulSet, error) { +func upgradeStatefulSet(sts *appsv1.StatefulSet) (*kruisev1beta1.StatefulSet, error) { data, err := json.Marshal(sts) if err != nil { return nil, err } - newSts := &kruisev1alpha1.StatefulSet{} + newSts := &kruisev1beta1.StatefulSet{} if err := json.Unmarshal(data, newSts); err != nil { return nil, err } - newSts.TypeMeta.APIVersion = kruisev1alpha1.SchemeGroupVersion.String() - newSts.Spec.Template.Spec.ReadinessGates = []corev1.PodReadinessGate{ - { - ConditionType: kruisepub.InPlaceUpdateReady, - }, - } - newSts.Spec.UpdateStrategy.RollingUpdate.PodUpdatePolicy = kruisev1alpha1.InPlaceIfPossiblePodUpdateStrategyType + newSts.TypeMeta.APIVersion = kruisev1beta1.SchemeGroupVersion.String() + //newSts.Spec.Template.Spec.ReadinessGates = []corev1.PodReadinessGate{ + // { + // ConditionType: kruisepub.InPlaceUpdateReady, + // }, + //} + newSts.Spec.UpdateStrategy.RollingUpdate.PodUpdatePolicy = kruisev1beta1.RecreatePodUpdateStrategyType return newSts, nil } @@ -417,7 +416,7 @@ echo "export NODE_ZONE=${NODE_ZONE}" > /node/zone image = pointer.StringDeref(nc.Spec.AlpineImage, "") } - return corev1.Container{ + container := corev1.Container{ Name: "node-labels", Image: image, Command: []string{"/bin/sh", "-c"}, @@ -451,6 +450,12 @@ echo "export NODE_ZONE=${NODE_ZONE}" > /node/zone }, }, } + + imagePullPolicy := nc.Spec.ImagePullPolicy + if imagePullPolicy != nil { + container.ImagePullPolicy = *imagePullPolicy + } + return container } func generateInitContainers(c NebulaClusterComponent) []corev1.Container { @@ -705,7 +710,7 @@ func generateWorkload( if err != nil { return nil, err } - case kruisev1alpha1.SchemeGroupVersion.WithKind("StatefulSet").String(): + case kruisev1beta1.SchemeGroupVersion.WithKind("StatefulSet").String(): var set *appsv1.StatefulSet set, err = generateStatefulSet(c, cm) if err != nil { diff --git a/apis/apps/v1alpha1/nebulacluster_component.go b/apis/apps/v1alpha1/nebulacluster_component.go index 2af69e09..6b3266fb 100644 --- a/apis/apps/v1alpha1/nebulacluster_component.go +++ b/apis/apps/v1alpha1/nebulacluster_component.go @@ -209,6 +209,7 @@ type NebulaClusterComponent interface { GetPhase() ComponentPhase IsSuspending() bool IsSuspended() bool + IsAutoFailovering() bool SetWorkloadStatus(status *WorkloadStatus) UpdateComponentStatus(status *ComponentStatus) } diff --git a/apis/apps/v1alpha1/nebulacluster_graphd.go b/apis/apps/v1alpha1/nebulacluster_graphd.go index 56b10c3b..3b10f5ee 100644 --- a/apis/apps/v1alpha1/nebulacluster_graphd.go +++ b/apis/apps/v1alpha1/nebulacluster_graphd.go @@ -346,3 +346,8 @@ func (c *graphdComponent) IsSuspended() bool { } return true } + +func (c *graphdComponent) IsAutoFailovering() bool { + //TODO implement me + return false +} diff --git a/apis/apps/v1alpha1/nebulacluster_metad.go b/apis/apps/v1alpha1/nebulacluster_metad.go index 9108fb7e..c9fb6723 100644 --- a/apis/apps/v1alpha1/nebulacluster_metad.go +++ b/apis/apps/v1alpha1/nebulacluster_metad.go @@ -424,3 +424,8 @@ func (c *metadComponent) IsSuspended() bool { } return true } + +func (c *metadComponent) IsAutoFailovering() bool { + //TODO implement me + return false +} diff --git a/apis/apps/v1alpha1/nebulacluster_storaged.go b/apis/apps/v1alpha1/nebulacluster_storaged.go index c2109867..21eb7395 100644 --- a/apis/apps/v1alpha1/nebulacluster_storaged.go +++ b/apis/apps/v1alpha1/nebulacluster_storaged.go @@ -404,6 +404,18 @@ func (c *storagedComponent) IsSuspended() bool { return true } +func (c *storagedComponent) IsAutoFailovering() bool { + if len(c.nc.Status.Storaged.FailureHosts) == 0 { + return false + } + for _, failureHost := range c.nc.Status.Storaged.FailureHosts { + if !failureHost.ConfirmationTime.IsZero() { + return true + } + } + return false +} + func storageDataVolumeClaims(storageClaims []StorageClaim, componentType string) ([]corev1.PersistentVolumeClaim, error) { var pvcs []corev1.PersistentVolumeClaim for i := range storageClaims { diff --git a/apis/apps/v1alpha1/nebulacluster_types.go b/apis/apps/v1alpha1/nebulacluster_types.go index 368ac85b..fe0d2802 100644 --- a/apis/apps/v1alpha1/nebulacluster_types.go +++ b/apis/apps/v1alpha1/nebulacluster_types.go @@ -19,6 +19,7 @@ package v1alpha1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) // NebulaClusterConditionType represents a nebula cluster condition value. @@ -95,6 +96,13 @@ type NebulaClusterSpec struct { // +optional EnableBR *bool `json:"enableBR,omitempty"` + // Flag to enable/disable auto fail over in use local PV scenario, default false. + // +optional + EnableAutoFailover *bool `json:"enableAutoFailover,omitempty"` + + // +optional + FailoverPeriod metav1.Duration `json:"failoverPeriod,omitempty"` + // +optional LogRotate *LogRotate `json:"logRotate,omitempty"` @@ -134,11 +142,13 @@ type ComponentStatus struct { // StoragedStatus describes the status and version of nebula storaged. type StoragedStatus struct { - ComponentStatus `json:",inline"` - HostsAdded bool `json:"hostsAdded,omitempty"` - RemovedSpaces []int32 `json:"removedSpaces,omitempty"` - BalancedSpaces []int32 `json:"balancedSpaces,omitempty"` - LastBalanceJob *BalanceJob `json:"lastBalanceJob,omitempty"` + ComponentStatus `json:",inline"` + HostsAdded bool `json:"hostsAdded,omitempty"` + RemovedSpaces []int32 `json:"removedSpaces,omitempty"` + BalancedSpaces []int32 `json:"balancedSpaces,omitempty"` + LastBalanceJob *BalanceJob `json:"lastBalanceJob,omitempty"` + BalancedAfterFailover *bool `json:"balancedAfterFailover,omitempty"` + FailureHosts map[string]StoragedFailureHost `json:"failureHosts,omitempty"` } // BalanceJob describes the admin job for balance data. @@ -147,6 +157,21 @@ type BalanceJob struct { JobID int32 `json:"jobID,omitempty"` } +type EmptyStruct struct{} + +// StoragedFailureHost is the storaged failure host information. +type StoragedFailureHost struct { + Host string `json:"host,omitempty"` + PVCSet map[types.UID]EmptyStruct `json:"pvcSet,omitempty"` + HostDeleted bool `json:"hostDeleted,omitempty"` + PodRestarted bool `json:"podRestarted,omitempty"` + PodRebuilt bool `json:"podRebuilt,omitempty"` + NodeDown bool `json:"nodeDown,omitempty"` + CreationTime metav1.Time `json:"creationTime,omitempty"` + ConfirmationTime metav1.Time `json:"confirmationTime,omitempty"` + DeletionTime metav1.Time `json:"deletionTime,omitempty"` +} + // WorkloadStatus describes the status of a specified workload. type WorkloadStatus struct { // ObservedGeneration is the most recent generation observed for this Workload. It corresponds to the diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 31efa017..1c08e2f2 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,7 @@ import ( "github.com/vesoft-inc/nebula-go/v3/nebula" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -229,6 +230,21 @@ func (in *ConsoleSpec) DeepCopy() *ConsoleSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EmptyStruct) DeepCopyInto(out *EmptyStruct) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EmptyStruct. +func (in *EmptyStruct) DeepCopy() *EmptyStruct { + if in == nil { + return nil + } + out := new(EmptyStruct) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExporterSpec) DeepCopyInto(out *ExporterSpec) { *out = *in @@ -526,6 +542,12 @@ func (in *NebulaClusterSpec) DeepCopyInto(out *NebulaClusterSpec) { *out = new(bool) **out = **in } + if in.EnableAutoFailover != nil { + in, out := &in.EnableAutoFailover, &out.EnableAutoFailover + *out = new(bool) + **out = **in + } + out.FailoverPeriod = in.FailoverPeriod if in.LogRotate != nil { in, out := &in.LogRotate, &out.LogRotate *out = new(LogRotate) @@ -873,6 +895,31 @@ func (in *StorageProvider) DeepCopy() *StorageProvider { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StoragedFailureHost) DeepCopyInto(out *StoragedFailureHost) { + *out = *in + if in.PVCSet != nil { + in, out := &in.PVCSet, &out.PVCSet + *out = make(map[types.UID]EmptyStruct, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + in.CreationTime.DeepCopyInto(&out.CreationTime) + in.ConfirmationTime.DeepCopyInto(&out.ConfirmationTime) + in.DeletionTime.DeepCopyInto(&out.DeletionTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StoragedFailureHost. +func (in *StoragedFailureHost) DeepCopy() *StoragedFailureHost { + if in == nil { + return nil + } + out := new(StoragedFailureHost) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StoragedSpec) DeepCopyInto(out *StoragedSpec) { *out = *in @@ -947,6 +994,18 @@ func (in *StoragedStatus) DeepCopyInto(out *StoragedStatus) { *out = new(BalanceJob) **out = **in } + if in.BalancedAfterFailover != nil { + in, out := &in.BalancedAfterFailover, &out.BalancedAfterFailover + *out = new(bool) + **out = **in + } + if in.FailureHosts != nil { + in, out := &in.FailureHosts, &out.FailureHosts + *out = make(map[string]StoragedFailureHost, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StoragedStatus. diff --git a/charts/nebula-operator/crds/nebulaclusters.yaml b/charts/nebula-operator/crds/nebulaclusters.yaml index bfbcfbe3..ecc77efe 100644 --- a/charts/nebula-operator/crds/nebulaclusters.yaml +++ b/charts/nebula-operator/crds/nebulaclusters.yaml @@ -482,6 +482,8 @@ spec: version: type: string type: object + enableAutoFailover: + type: boolean enableBR: type: boolean enablePVReclaim: @@ -3184,6 +3186,8 @@ spec: required: - image type: object + failoverPeriod: + type: string graphd: properties: affinity: @@ -11718,11 +11722,41 @@ spec: type: integer storaged: properties: + balancedAfterFailover: + type: boolean balancedSpaces: items: format: int32 type: integer type: array + failureHosts: + additionalProperties: + properties: + confirmationTime: + format: date-time + type: string + creationTime: + format: date-time + type: string + deletionTime: + format: date-time + type: string + host: + type: string + hostDeleted: + type: boolean + nodeDown: + type: boolean + podRebuilt: + type: boolean + podRestarted: + type: boolean + pvcSet: + additionalProperties: + type: object + type: object + type: object + type: object hostsAdded: type: boolean lastBalanceJob: diff --git a/cmd/autoscaler/app/options/options.go b/cmd/autoscaler/app/options/options.go index 3ffdc2e8..a92a24fc 100644 --- a/cmd/autoscaler/app/options/options.go +++ b/cmd/autoscaler/app/options/options.go @@ -84,7 +84,7 @@ func NewOptions() *Options { } func (o *Options) AddFlags(flags *pflag.FlagSet) { - flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") + flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", false, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") flags.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", NamespaceNebulaSystem, "The namespace of resource object that is used for locking during leader election.") flags.DurationVar(&o.LeaderElection.LeaseDuration.Duration, "leader-elect-lease-duration", defaultElectionLeaseDuration.Duration, ""+ "The duration that non-leader candidates will wait after observing a leadership "+ diff --git a/cmd/controller-manager/app/controller-manager.go b/cmd/controller-manager/app/controller-manager.go index d4230d3f..9ecf108e 100644 --- a/cmd/controller-manager/app/controller-manager.go +++ b/cmd/controller-manager/app/controller-manager.go @@ -21,7 +21,7 @@ import ( "flag" "net/http" - kruiseapi "github.com/openkruise/kruise-api/apps/v1alpha1" + kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -92,7 +92,8 @@ func Run(ctx context.Context, opts *options.Options) error { profileflag.ListenAndServe(opts.ProfileOpts) if opts.EnableKruiseScheme { - utilruntime.Must(kruiseapi.AddToScheme(scheme)) + utilruntime.Must(kruisev1beta1.AddToScheme(clientgoscheme.Scheme)) + utilruntime.Must(kruisev1beta1.AddToScheme(scheme)) klog.Info("register openkruise scheme") } diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 5fe6acfa..b7aa2680 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -93,7 +93,7 @@ func NewOptions() *Options { } func (o *Options) AddFlags(flags *pflag.FlagSet) { - flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", true, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") + flags.BoolVar(&o.LeaderElection.LeaderElect, "leader-elect", false, "Start a leader election client and gain leadership before executing the main loop. Enable this when running replicated components for high availability.") flags.StringVar(&o.LeaderElection.ResourceNamespace, "leader-elect-resource-namespace", NamespaceNebulaSystem, "The namespace of resource object that is used for locking during leader election.") flags.DurationVar(&o.LeaderElection.LeaseDuration.Duration, "leader-elect-lease-duration", defaultElectionLeaseDuration.Duration, ""+ "The duration that non-leader candidates will wait after observing a leadership "+ diff --git a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml index bfbcfbe3..ecc77efe 100644 --- a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml @@ -482,6 +482,8 @@ spec: version: type: string type: object + enableAutoFailover: + type: boolean enableBR: type: boolean enablePVReclaim: @@ -3184,6 +3186,8 @@ spec: required: - image type: object + failoverPeriod: + type: string graphd: properties: affinity: @@ -11718,11 +11722,41 @@ spec: type: integer storaged: properties: + balancedAfterFailover: + type: boolean balancedSpaces: items: format: int32 type: integer type: array + failureHosts: + additionalProperties: + properties: + confirmationTime: + format: date-time + type: string + creationTime: + format: date-time + type: string + deletionTime: + format: date-time + type: string + host: + type: string + hostDeleted: + type: boolean + nodeDown: + type: boolean + podRebuilt: + type: boolean + podRestarted: + type: boolean + pvcSet: + additionalProperties: + type: object + type: object + type: object + type: object hostsAdded: type: boolean lastBalanceJob: diff --git a/pkg/controller/component/data_balancer.go b/pkg/controller/component/data_balancer.go new file mode 100644 index 00000000..de9020b5 --- /dev/null +++ b/pkg/controller/component/data_balancer.go @@ -0,0 +1,140 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package component + +import ( + nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/pkg/kube" + "github.com/vesoft-inc/nebula-operator/pkg/nebula" + "k8s.io/klog/v2" +) + +func balanceSpace(clientSet kube.ClientSet, mc nebula.MetaInterface, nc *v1alpha1.NebulaCluster, spaceID nebulago.GraphSpaceID) error { + if nc.Status.Storaged.LastBalanceJob != nil && nc.Status.Storaged.LastBalanceJob.SpaceID == spaceID { + if err := mc.BalanceStatus(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID); err != nil { + if err == nebula.ErrJobStatusFailed { + klog.Infof("space %d balance job %d will be recovered", nc.Status.Storaged.LastBalanceJob.SpaceID, nc.Status.Storaged.LastBalanceJob.JobID) + if nc.IsZoneEnabled() { + return mc.RecoverInZoneBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) + } + return mc.RecoverDataBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) + } + return err + } + if err := mc.BalanceLeader(nc.Status.Storaged.LastBalanceJob.SpaceID); err != nil { + return err + } + nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, nc.Status.Storaged.LastBalanceJob.SpaceID) + return clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc) + } + + namespace := nc.GetNamespace() + componentName := nc.StoragedComponent().GetName() + if nc.IsZoneEnabled() { + jobID, err := mc.BalanceDataInZone(spaceID) + if err != nil { + klog.Errorf("storaged cluster [%s/%s] balance data in zone error: %v", namespace, componentName, err) + if jobID > 0 { + nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ + SpaceID: spaceID, + JobID: jobID, + } + if err := clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { + return err + } + } + return err + } + return nil + } + + jobID, err := mc.BalanceData(spaceID) + if err != nil { + klog.Errorf("storaged cluster [%s/%s] balance data across zone error: %v", namespace, componentName, err) + if jobID > 0 { + nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ + SpaceID: spaceID, + JobID: jobID, + } + if err := clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { + return err + } + } + return err + } + return nil +} + +func removeHost( + clientSet kube.ClientSet, + mc nebula.MetaInterface, + nc *v1alpha1.NebulaCluster, + spaceID nebulago.GraphSpaceID, + hosts []*nebulago.HostAddr, +) error { + if nc.Status.Storaged.LastBalanceJob != nil && nc.Status.Storaged.LastBalanceJob.SpaceID == spaceID { + if err := mc.BalanceStatus(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID); err != nil { + if err == nebula.ErrJobStatusFailed { + klog.Infof("space %d balance job %d will be recovered", nc.Status.Storaged.LastBalanceJob.SpaceID, nc.Status.Storaged.LastBalanceJob.JobID) + if nc.IsZoneEnabled() { + return mc.RecoverInZoneBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) + } + return mc.RecoverDataBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) + } + return err + } + nc.Status.Storaged.RemovedSpaces = append(nc.Status.Storaged.RemovedSpaces, nc.Status.Storaged.LastBalanceJob.SpaceID) + return clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc) + } + + namespace := nc.GetNamespace() + componentName := nc.StoragedComponent().GetName() + if nc.IsZoneEnabled() { + jobID, err := mc.RemoveHostInZone(spaceID, hosts) + klog.Errorf("storaged cluster [%s/%s] remove host in zone error: %v", namespace, componentName, err) + if err != nil { + if jobID > 0 { + nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ + SpaceID: spaceID, + JobID: jobID, + } + if err := clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { + return err + } + } + return err + } + return nil + } + + jobID, err := mc.RemoveHost(spaceID, hosts) + if err != nil { + klog.Errorf("storaged cluster [%s/%s] remove host across zone error: %v", namespace, componentName, err) + if jobID > 0 { + nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ + SpaceID: spaceID, + JobID: jobID, + } + if err := clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { + return err + } + } + return err + } + return nil +} diff --git a/pkg/controller/component/graphd_cluster.go b/pkg/controller/component/graphd_cluster.go index 1176e2fc..383e2017 100644 --- a/pkg/controller/component/graphd_cluster.go +++ b/pkg/controller/component/graphd_cluster.go @@ -110,12 +110,14 @@ func (c *graphdCluster) syncGraphdWorkload(nc *v1alpha1.NebulaCluster) error { return err } - // TODO: validate the timestamp format - timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] - if ok { - if err := extender.SetTemplateAnnotations(newWorkload, - map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { - return err + if !notExist { + // TODO: validate the timestamp format + timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + if err := extender.SetTemplateAnnotations(newWorkload, + map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { + return err + } } } diff --git a/pkg/controller/component/helper.go b/pkg/controller/component/helper.go index 9bd38a00..50912c3a 100644 --- a/pkg/controller/component/helper.go +++ b/pkg/controller/component/helper.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog/v2" + podutils "k8s.io/kubernetes/pkg/api/v1/pod" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" @@ -485,17 +486,13 @@ func updateSinglePod(clientSet kube.ClientSet, newPod, oldPod *corev1.Pod) error return err } - if err := clientSet.Pod().DeletePod(oldPod.Namespace, oldPod.Name); err != nil { + if err := clientSet.Pod().DeletePod(oldPod.Namespace, oldPod.Name, false); err != nil { return err } return clientSet.Pod().CreatePod(newPod) } -func isPending(pod *corev1.Pod) bool { - return pod.Status.Phase == corev1.PodPending -} - func syncPVC( component v1alpha1.NebulaClusterComponent, pvcClient kube.PersistentVolumeClaim) error { @@ -579,3 +576,33 @@ func canSuspendComponent(component v1alpha1.NebulaClusterComponent) (bool, strin } return true, "" } + +func isPodPending(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodPending +} + +func isPodHealthy(pod *corev1.Pod) bool { + return isPodRunningAndReady(pod) && !isPodTerminating(pod) +} + +func isPodRunningAndReady(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodRunning && podutils.IsPodReady(pod) +} + +func isPodTerminating(pod *corev1.Pod) bool { + return pod.DeletionTimestamp != nil +} + +func isPodConditionScheduledTrue(conditions []corev1.PodCondition) bool { + podSchCond := getPodConditionFromList(conditions, corev1.PodScheduled) + return podSchCond != nil && podSchCond.Status == corev1.ConditionTrue +} + +func getPodConditionFromList(conditions []corev1.PodCondition, conditionType corev1.PodConditionType) *corev1.PodCondition { + for i := range conditions { + if conditions[i].Type == conditionType { + return &conditions[i] + } + } + return nil +} diff --git a/pkg/controller/component/interface.go b/pkg/controller/component/interface.go index fef5b0da..37c70623 100644 --- a/pkg/controller/component/interface.go +++ b/pkg/controller/component/interface.go @@ -44,3 +44,8 @@ type UpdateManager interface { // RestartPod restart the specified Pod RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) error } + +type FailoverManager interface { + Failover(nc *v1alpha1.NebulaCluster) error + Recovery(nc *v1alpha1.NebulaCluster) error +} diff --git a/pkg/controller/component/metad_cluster.go b/pkg/controller/component/metad_cluster.go index 1e300881..5ff713d8 100644 --- a/pkg/controller/component/metad_cluster.go +++ b/pkg/controller/component/metad_cluster.go @@ -118,11 +118,13 @@ func (c *metadCluster) syncMetadWorkload(nc *v1alpha1.NebulaCluster) error { return err } - timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] - if ok { - if err := extender.SetTemplateAnnotations(newWorkload, - map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { - return err + if !notExist { + timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + if err := extender.SetTemplateAnnotations(newWorkload, + map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { + return err + } } } diff --git a/pkg/controller/component/storaged_cluster.go b/pkg/controller/component/storaged_cluster.go index be9c9e3f..22f42ea6 100644 --- a/pkg/controller/component/storaged_cluster.go +++ b/pkg/controller/component/storaged_cluster.go @@ -19,15 +19,19 @@ package component import ( "fmt" "strconv" + "strings" + "time" + nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" "github.com/vesoft-inc/nebula-go/v3/nebula/meta" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" "github.com/vesoft-inc/nebula-operator/pkg/kube" @@ -39,10 +43,11 @@ import ( ) type storagedCluster struct { - clientSet kube.ClientSet - dm discovery.Interface - scaleManager ScaleManager - updateManager UpdateManager + clientSet kube.ClientSet + dm discovery.Interface + scaleManager ScaleManager + updateManager UpdateManager + failoverManager FailoverManager } func NewStoragedCluster( @@ -50,12 +55,14 @@ func NewStoragedCluster( dm discovery.Interface, sm ScaleManager, um UpdateManager, + fm FailoverManager, ) ReconcileManager { return &storagedCluster{ - clientSet: clientSet, - dm: dm, - scaleManager: sm, - updateManager: um, + clientSet: clientSet, + dm: dm, + scaleManager: sm, + updateManager: um, + failoverManager: fm, } } @@ -124,11 +131,13 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error return err } - timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] - if ok { - if err := extender.SetTemplateAnnotations(newWorkload, - map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { - return err + if !notExist { + timestamp, ok := oldWorkload.GetAnnotations()[annotation.AnnRestartTimestamp] + if ok { + if err := extender.SetTemplateAnnotations(newWorkload, + map[string]string{annotation.AnnRestartTimestamp: timestamp}); err != nil { + return err + } } } @@ -181,6 +190,22 @@ func (c *storagedCluster) syncStoragedWorkload(nc *v1alpha1.NebulaCluster) error } } + if nc.IsAutoFailoverEnabled() { + r, err := c.shouldRecover(nc) + if err != nil { + return err + } + if r { + if err := c.failoverManager.Recovery(nc); err != nil { + return err + } + } else if nc.StoragedComponent().IsAutoFailovering() { + if err := c.failoverManager.Failover(nc); err != nil { + return err + } + } + } + if err := c.scaleManager.Scale(nc, oldWorkload, newWorkload); err != nil { klog.Errorf("scale storaged cluster [%s/%s] failed: %v", namespace, componentName, err) return err @@ -250,7 +275,67 @@ func (c *storagedCluster) syncNebulaClusterStatus( nc.Status.Storaged.Phase = v1alpha1.UpdatePhase } - // TODO: show storaged hosts state with storaged peers + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + hosts := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(hosts, options...) + if err != nil { + return err + } + defer func() { + _ = metaClient.Disconnect() + }() + + hostItems, err := metaClient.ListHosts(meta.ListHostType_STORAGE) + if err != nil { + return err + } + thriftPort := nc.StoragedComponent().GetPort(v1alpha1.StoragedPortNameThrift) + for i := range hostItems { + host := hostItems[i] + if host.Status == meta.HostStatus_OFFLINE && host.HostAddr.Port == thriftPort { + podName := strings.Split(host.HostAddr.Host, ".")[0] + if nc.Status.Storaged.FailureHosts == nil { + nc.Status.Storaged.FailureHosts = make(map[string]v1alpha1.StoragedFailureHost) + } + fh, exits := nc.Status.Storaged.FailureHosts[podName] + if exits { + deadline := fh.CreationTime.Add(nc.Spec.FailoverPeriod.Duration) + if time.Now().After(deadline) { + if fh.ConfirmationTime.IsZero() { + fh.ConfirmationTime = metav1.Time{Time: time.Now()} + pvcs, err := getPodPvcs(c.clientSet, nc, podName) + if err != nil { + return err + } + pvcSet := make(map[types.UID]v1alpha1.EmptyStruct) + for _, pvc := range pvcs { + pvcSet[pvc.UID] = v1alpha1.EmptyStruct{} + } + fh.PVCSet = pvcSet + nc.Status.Storaged.FailureHosts[podName] = fh + if err := c.clientSet.NebulaCluster().UpdateNebulaCluster(nc); err != nil { + return err + } + klog.Infof("storaged pod [%s/%s] failover period exceeds %s", nc.Namespace, podName, nc.Spec.FailoverPeriod.Duration.String()) + } + } + continue + } + failureHost := v1alpha1.StoragedFailureHost{ + Host: host.HostAddr.Host, + CreationTime: metav1.Time{Time: time.Now()}, + } + nc.Status.Storaged.FailureHosts[podName] = failureHost + if err := c.clientSet.NebulaCluster().UpdateNebulaCluster(nc); err != nil { + return err + } + klog.Infof("failure storage host %s found", host.HostAddr.Host) + } + } + return syncComponentStatus(nc.StoragedComponent(), &nc.Status.Storaged.ComponentStatus, oldWorkload) } @@ -410,6 +495,57 @@ func (c *storagedCluster) updateZoneMappings(nc *v1alpha1.NebulaCluster, newRepl return nil } +func (c *storagedCluster) shouldRecover(nc *v1alpha1.NebulaCluster) (bool, error) { + if nc.Status.Storaged.FailureHosts == nil { + return false, nil + } + + fhSet := sets.New[string]() + for podName, fh := range nc.Status.Storaged.FailureHosts { + pod, err := c.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil { + klog.Errorf("storaged pod [%s/%s] does not exist: %v", nc.Namespace, podName, err) + return false, err + } + if !isPodHealthy(pod) { + return false, nil + } + fhSet.Insert(fh.Host) + } + + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return false, err + } + hosts := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(hosts, options...) + if err != nil { + return false, err + } + defer func() { + _ = metaClient.Disconnect() + }() + + hostItems, err := metaClient.ListHosts(meta.ListHostType_STORAGE) + if err != nil { + return false, err + } + thriftPort := nc.StoragedComponent().GetPort(v1alpha1.StoragedPortNameThrift) + for _, host := range hostItems { + if fhSet.Has(host.HostAddr.Host) && + host.Status != meta.HostStatus_ONLINE && + host.HostAddr.Port == thriftPort { + return false, nil + } + } + + if nc.Status.Storaged.BalancedAfterFailover != nil && !*nc.Status.Storaged.BalancedAfterFailover { + return false, nil + } + + return true, nil +} + type FakeStoragedCluster struct { err error } diff --git a/pkg/controller/component/storaged_failover.go b/pkg/controller/component/storaged_failover.go new file mode 100644 index 00000000..3101c21d --- /dev/null +++ b/pkg/controller/component/storaged_failover.go @@ -0,0 +1,405 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package component + +import ( + "context" + "fmt" + "time" + + nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + podutils "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" + "github.com/vesoft-inc/nebula-operator/apis/pkg/annotation" + "github.com/vesoft-inc/nebula-operator/apis/pkg/label" + "github.com/vesoft-inc/nebula-operator/pkg/kube" + "github.com/vesoft-inc/nebula-operator/pkg/nebula" + "github.com/vesoft-inc/nebula-operator/pkg/util/condition" + utilerrors "github.com/vesoft-inc/nebula-operator/pkg/util/errors" +) + +const ( + PVCProtectionFinalizer = "kubernetes.io/pvc-protection" + RestartTolerancePeriod = time.Minute * 1 +) + +var ( + taints = []*corev1.Taint{ + { + Key: corev1.TaintNodeUnreachable, + Effect: corev1.TaintEffectNoExecute, + }, + { + Key: corev1.TaintNodeNotReady, + Effect: corev1.TaintEffectNoExecute, + }, + } +) + +type storagedFailover struct { + client.Client + clientSet kube.ClientSet +} + +func NewStoragedFailover(c client.Client, clientSet kube.ClientSet) FailoverManager { + return &storagedFailover{Client: c, clientSet: clientSet} +} + +func (s *storagedFailover) Failover(nc *v1alpha1.NebulaCluster) error { + if err := s.tryRestartPod(nc); err != nil { + return err + } + readyPods, err := s.toleratePods(nc) + if err != nil { + return err + } + if len(readyPods) == len(nc.Status.Storaged.FailureHosts) { + return nil + } + if err := s.deleteFailureHost(nc); err != nil { + return err + } + if err := s.deleteFailurePodAndPVC(nc); err != nil { + return err + } + if err := s.checkPendingPod(nc); err != nil { + return err + } + if !pointer.BoolDeref(nc.Status.Storaged.BalancedAfterFailover, false) { + if err := s.balanceData(nc); err != nil { + return err + } + } + return nil +} + +func (s *storagedFailover) Recovery(nc *v1alpha1.NebulaCluster) error { + nc.Status.Storaged.FailureHosts = nil + klog.Infof("clearing storaged cluster [%s/%s] failure hosts", nc.GetNamespace(), nc.GetName()) + return nil +} + +func (s *storagedFailover) tryRestartPod(nc *v1alpha1.NebulaCluster) error { + for podName, fh := range nc.Status.Storaged.FailureHosts { + if fh.PodRestarted { + continue + } + if err := s.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + fh.PodRestarted = true + nc.Status.Storaged.FailureHosts[podName] = fh + return utilerrors.ReconcileErrorf("try to restart failure storaged pod [%s/%s] for recovery", nc.Namespace, podName) + } + return nil +} + +func (s *storagedFailover) toleratePods(nc *v1alpha1.NebulaCluster) ([]string, error) { + readyPods := make([]string, 0) + for podName, fh := range nc.Status.Storaged.FailureHosts { + if fh.HostDeleted { + continue + } + pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil { + return nil, err + } + if pod != nil && isPodTerminating(pod) { + node, err := s.clientSet.Node().GetNode(pod.Spec.NodeName) + if err != nil { + klog.Errorf("get node %s failed: %v", pod.Spec.NodeName, err) + return nil, err + } + if isNodeDown(node) { + fh.NodeDown = true + nc.Status.Storaged.FailureHosts[podName] = fh + continue + } + return nil, utilerrors.ReconcileErrorf("failure storaged pod [%s/%s] is deleting", nc.Namespace, podName) + } + if isPodHealthy(pod) { + readyPods = append(readyPods, podName) + continue + } + tolerance := pod.CreationTimestamp.Add(RestartTolerancePeriod) + if time.Now().Before(tolerance) { + return nil, utilerrors.ReconcileErrorf("waiting failure storaged pod [%s/%s] ready in tolerance period", nc.Namespace, podName) + } + } + return readyPods, nil +} + +func (s *storagedFailover) deleteFailureHost(nc *v1alpha1.NebulaCluster) error { + ns := nc.GetNamespace() + componentName := nc.StoragedComponent().GetName() + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + endpoints := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(endpoints, options...) + if err != nil { + return err + } + defer func() { + err := metaClient.Disconnect() + if err != nil { + klog.Errorf("meta client disconnect failed: %v", err) + } + }() + + hosts := make([]*nebulago.HostAddr, 0) + for _, fh := range nc.Status.Storaged.FailureHosts { + if fh.HostDeleted { + continue + } + count, err := metaClient.GetLeaderCount(fh.Host) + if err != nil { + klog.Errorf("storaged host %s get leader count failed: %v", fh.Host, err) + return err + } + if count > 0 { + return utilerrors.ReconcileErrorf("waiting for storaged host %s peers leader election done", fh.Host) + } + hosts = append(hosts, &nebulago.HostAddr{ + Host: fh.Host, + Port: nc.StoragedComponent().GetPort(v1alpha1.StoragedPortNameThrift), + }) + } + if len(hosts) == 0 { + return nil + } + + spaces, err := metaClient.ListSpaces() + if err != nil { + return err + } + + if len(spaces) > 0 { + if nc.Status.Storaged.RemovedSpaces == nil { + nc.Status.Storaged.RemovedSpaces = make([]int32, 0, len(spaces)) + } + nc.Status.Storaged.BalancedAfterFailover = pointer.Bool(false) + } + for _, space := range spaces { + if contains(nc.Status.Storaged.RemovedSpaces, *space.Id.SpaceID) { + continue + } + if err := removeHost(s.clientSet, metaClient, nc, *space.Id.SpaceID, hosts); err != nil { + klog.Errorf("storaged cluster [%s/%s] remove failure hosts %v failed: %v", ns, componentName, hosts, err) + return err + } + klog.Infof("storaged cluster [%s/%s] remove failure hosts %v in the space %s successfully", ns, componentName, hosts, space.Name) + } + + for podName, fh := range nc.Status.Storaged.FailureHosts { + if fh.HostDeleted { + continue + } + fh.HostDeleted = true + nc.Status.Storaged.FailureHosts[podName] = fh + } + + nc.Status.Storaged.RemovedSpaces = nil + nc.Status.Storaged.LastBalanceJob = nil + return utilerrors.ReconcileErrorf("try to remove storaged cluster [%s/%s] failure host for recovery", ns, componentName) +} + +func (s *storagedFailover) deleteFailurePodAndPVC(nc *v1alpha1.NebulaCluster) error { + for podName, fh := range nc.Status.Storaged.FailureHosts { + if fh.PodRebuilt { + continue + } + pod, pvcs, err := s.getPodAndPvcs(nc, podName) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if pod == nil { + klog.Infof("failure storaged pod [%s/%s] not found, skip", nc.Namespace, podName) + continue + } + if !isPodTerminating(pod) { + podScheduled := isPodConditionScheduledTrue(pod.Status.Conditions) + klog.Infof("scheduled condition of pod [%s/%s] is %v", nc.Namespace, podName, podScheduled) + if err := s.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + } else { + klog.Infof("pod [%s/%s] has DeletionTimestamp set to %s", nc.Namespace, podName, pod.DeletionTimestamp.String()) + } + + for i := range pvcs { + pvc := pvcs[i] + if _, exist := fh.PVCSet[pvc.UID]; exist { + if pvc.DeletionTimestamp == nil { + if err := s.clientSet.PVC().DeletePVC(nc.Namespace, pvc.Name); err != nil { + return err + } + klog.Infof("delete failure storaged pod PVC [%s/%s] successfully", nc.Namespace, pvc.Name) + } else { + klog.Infof("PVC [%s/%s] has DeletionTimestamp set to %s", nc.Namespace, pvc.Name, pvc.DeletionTimestamp.String()) + } + } + } + fh.PodRebuilt = true + fh.DeletionTime = metav1.Time{Time: time.Now()} + nc.Status.Storaged.FailureHosts[podName] = fh + return utilerrors.ReconcileErrorf("try to delete failure storaged pod [%s/%s] for rebuilding", nc.Namespace, podName) + } + return nil +} + +func (s *storagedFailover) checkPendingPod(nc *v1alpha1.NebulaCluster) error { + for podName, fh := range nc.Status.Storaged.FailureHosts { + pod, pvcs, err := s.getPodAndPvcs(nc, podName) + if err != nil { + return err + } + if pod == nil { + return fmt.Errorf("rebuilt storaged pod [%s/%s] not found", nc.Namespace, podName) + } + for i := range pvcs { + pvc := pvcs[i] + if _, exist := fh.PVCSet[pvc.UID]; exist { + if pvc.DeletionTimestamp != nil && len(pvc.GetFinalizers()) > 0 { + if err := kube.UpdateFinalizer(context.TODO(), s.Client, pvc.DeepCopy(), kube.RemoveFinalizerOpType, PVCProtectionFinalizer); err != nil { + return err + } + return utilerrors.ReconcileErrorf("waiting for PVC [%s/%s] finalizer updated", nc.Namespace, pvc.Name) + } + } + } + if isPodConditionScheduledTrue(pod.Status.Conditions) && isPodPending(pod) { + klog.Infof("storagd pod [%s/%s] conditions %v", nc.Namespace, podName, pod.Status.Conditions) + if err := s.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { + return err + } + return utilerrors.ReconcileErrorf("waiting for pending storaged pod [%s/%s] deleted", nc.Namespace, podName) + } + } + return nil +} + +func (s *storagedFailover) getPodAndPvcs(nc *v1alpha1.NebulaCluster, podName string) (*corev1.Pod, []corev1.PersistentVolumeClaim, error) { + pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil { + return nil, nil, err + } + pvcs, err := getPodPvcs(s.clientSet, nc, podName) + if err != nil { + return pod, nil, err + } + return pod, pvcs, nil +} + +func (s *storagedFailover) balanceData(nc *v1alpha1.NebulaCluster) error { + for podName := range nc.Status.Storaged.FailureHosts { + pod, err := s.clientSet.Pod().GetPod(nc.Namespace, podName) + if err != nil { + return err + } + if !podutils.IsPodReady(pod) { + return utilerrors.ReconcileErrorf("rebuilt storaged pod [%s/%s] is not ready", nc.Namespace, podName) + } + } + + options, err := nebula.ClientOptions(nc, nebula.SetIsMeta(true)) + if err != nil { + return err + } + endpoints := []string{nc.GetMetadThriftConnAddress()} + metaClient, err := nebula.NewMetaClient(endpoints, options...) + if err != nil { + klog.Errorf("create meta client failed: %v", err) + return err + } + defer func() { + err := metaClient.Disconnect() + if err != nil { + klog.Errorf("disconnect meta client failed: %v", err) + } + }() + + spaces, err := metaClient.ListSpaces() + if err != nil { + return err + } + if len(spaces) > 0 && nc.Status.Storaged.BalancedSpaces == nil { + nc.Status.Storaged.BalancedSpaces = make([]int32, 0, len(spaces)) + } + for _, space := range spaces { + if contains(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) { + continue + } + if err := balanceSpace(s.clientSet, metaClient, nc, *space.Id.SpaceID); err != nil { + return err + } + } + + nc.Status.Storaged.BalancedSpaces = nil + nc.Status.Storaged.LastBalanceJob = nil + nc.Status.Storaged.BalancedAfterFailover = pointer.Bool(true) + return s.clientSet.NebulaCluster().UpdateNebulaCluster(nc) +} + +func getPodPvcs(clientSet kube.ClientSet, nc *v1alpha1.NebulaCluster, podName string) ([]corev1.PersistentVolumeClaim, error) { + cl := label.New().Cluster(nc.GetClusterName()).Storaged() + cl[annotation.AnnPodNameKey] = podName + pvcSelector, err := cl.Selector() + if err != nil { + return nil, err + } + pvcs, err := clientSet.PVC().ListPVCs(nc.Namespace, pvcSelector) + if err != nil && !apierrors.IsNotFound(err) { + return nil, err + } + return pvcs, nil +} + +func isNodeDown(node *corev1.Node) bool { + for i := range taints { + if taintExists(node.Spec.Taints, taints[i]) { + klog.Infof("node %s found taint %s", node.Name, taints[i].Key) + return true + } + } + if condition.IsNodeReadyFalseOrUnknown(&node.Status) { + klog.Infof("node %s is not ready", node.Name) + conditions := condition.GetNodeTrueConditions(&node.Status) + for i := range conditions { + klog.Infof("node %s condition type %s is true", conditions[i].Type) + } + return true + } + return false +} + +func taintExists(taints []corev1.Taint, taintToFind *corev1.Taint) bool { + for _, taint := range taints { + if taint.MatchTaint(taintToFind) { + return true + } + } + return false +} diff --git a/pkg/controller/component/storaged_scaler.go b/pkg/controller/component/storaged_scaler.go index a9694b84..ba4bc181 100644 --- a/pkg/controller/component/storaged_scaler.go +++ b/pkg/controller/component/storaged_scaler.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" "github.com/vesoft-inc/nebula-operator/apis/apps/v1alpha1" @@ -34,12 +33,11 @@ import ( ) type storageScaler struct { - client.Client clientSet kube.ClientSet } -func NewStorageScaler(cli client.Client, clientSet kube.ClientSet) ScaleManager { - return &storageScaler{Client: cli, clientSet: clientSet} +func NewStorageScaler(clientSet kube.ClientSet) ScaleManager { + return &storageScaler{clientSet: clientSet} } func (ss *storageScaler) Scale(nc *v1alpha1.NebulaCluster, oldUnstruct, newUnstruct *unstructured.Unstructured) error { @@ -106,7 +104,7 @@ func (ss *storageScaler) ScaleOut(nc *v1alpha1.NebulaCluster) error { if contains(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) { continue } - if err := ss.balanceSpace(metaClient, nc, *space.Id.SpaceID); err != nil { + if err := balanceSpace(ss.clientSet, metaClient, nc, *space.Id.SpaceID); err != nil { return err } } @@ -168,7 +166,7 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep } return err } - if isPending(pod) { + if isPodPending(pod) { klog.Infof("skip host for pod [%s/%s] status is Pending", pod.Namespace, pod.Name) continue } @@ -183,7 +181,7 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep if contains(nc.Status.Storaged.RemovedSpaces, *space.Id.SpaceID) { continue } - if err := ss.removeHost(metaClient, nc, *space.Id.SpaceID, hosts); err != nil { + if err := removeHost(ss.clientSet, metaClient, nc, *space.Id.SpaceID, hosts); err != nil { klog.Errorf("remove hosts %v failed: %v", hosts, err) return err } @@ -235,117 +233,3 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep nc.Status.Storaged.Phase = v1alpha1.RunningPhase return nil } - -func (ss *storageScaler) balanceSpace(mc nebula.MetaInterface, nc *v1alpha1.NebulaCluster, spaceID nebulago.GraphSpaceID) error { - if nc.Status.Storaged.LastBalanceJob != nil && nc.Status.Storaged.LastBalanceJob.SpaceID == spaceID { - if err := mc.BalanceStatus(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID); err != nil { - if err == nebula.ErrJobStatusFailed { - klog.Infof("space %d balance job %d will be recovered", nc.Status.Storaged.LastBalanceJob.SpaceID, nc.Status.Storaged.LastBalanceJob.JobID) - if nc.IsZoneEnabled() { - return mc.RecoverInZoneBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) - } - return mc.RecoverDataBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) - } - return err - } - if err := mc.BalanceLeader(nc.Status.Storaged.LastBalanceJob.SpaceID); err != nil { - return err - } - nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, nc.Status.Storaged.LastBalanceJob.SpaceID) - return ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc) - } - - namespace := nc.GetNamespace() - componentName := nc.StoragedComponent().GetName() - if nc.IsZoneEnabled() { - jobID, err := mc.BalanceDataInZone(spaceID) - if err != nil { - klog.Errorf("storaged cluster [%s/%s] balance data in zone error: %v", namespace, componentName, err) - if jobID > 0 { - nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ - SpaceID: spaceID, - JobID: jobID, - } - if err := ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { - return err - } - } - return err - } - return nil - } - - jobID, err := mc.BalanceData(spaceID) - if err != nil { - klog.Errorf("storaged cluster [%s/%s] balance data across zone error: %v", namespace, componentName, err) - if jobID > 0 { - nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ - SpaceID: spaceID, - JobID: jobID, - } - if err := ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { - return err - } - } - return err - } - return nil -} - -func (ss *storageScaler) removeHost( - mc nebula.MetaInterface, - nc *v1alpha1.NebulaCluster, - spaceID nebulago.GraphSpaceID, - hosts []*nebulago.HostAddr, -) error { - if nc.Status.Storaged.LastBalanceJob != nil && nc.Status.Storaged.LastBalanceJob.SpaceID == spaceID { - if err := mc.BalanceStatus(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID); err != nil { - if err == nebula.ErrJobStatusFailed { - klog.Infof("space %d balance job %d will be recovered", nc.Status.Storaged.LastBalanceJob.SpaceID, nc.Status.Storaged.LastBalanceJob.JobID) - if nc.IsZoneEnabled() { - return mc.RecoverInZoneBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) - } - return mc.RecoverDataBalanceJob(nc.Status.Storaged.LastBalanceJob.JobID, nc.Status.Storaged.LastBalanceJob.SpaceID) - } - return err - } - nc.Status.Storaged.RemovedSpaces = append(nc.Status.Storaged.RemovedSpaces, nc.Status.Storaged.LastBalanceJob.SpaceID) - return ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc) - } - - namespace := nc.GetNamespace() - componentName := nc.StoragedComponent().GetName() - if nc.IsZoneEnabled() { - jobID, err := mc.RemoveHostInZone(spaceID, hosts) - klog.Errorf("storaged cluster [%s/%s] remove host in zone error: %v", namespace, componentName, err) - if err != nil { - if jobID > 0 { - nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ - SpaceID: spaceID, - JobID: jobID, - } - if err := ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { - return err - } - } - return err - } - return nil - } - - jobID, err := mc.RemoveHost(spaceID, hosts) - if err != nil { - klog.Errorf("storaged cluster [%s/%s] remove host across zone error: %v", namespace, componentName, err) - if jobID > 0 { - nc.Status.Storaged.LastBalanceJob = &v1alpha1.BalanceJob{ - SpaceID: spaceID, - JobID: jobID, - } - if err := ss.clientSet.NebulaCluster().UpdateNebulaClusterStatus(nc); err != nil { - return err - } - } - return err - } - return nil -} diff --git a/pkg/controller/component/storaged_updater.go b/pkg/controller/component/storaged_updater.go index 44065964..1bb26d26 100644 --- a/pkg/controller/component/storaged_updater.go +++ b/pkg/controller/component/storaged_updater.go @@ -26,7 +26,6 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" nebulago "github.com/vesoft-inc/nebula-go/v3/nebula" "github.com/vesoft-inc/nebula-go/v3/nebula/meta" @@ -50,12 +49,11 @@ const ( ) type storagedUpdater struct { - client.Client clientSet kube.ClientSet } -func NewStoragedUpdater(cli client.Client, clientSet kube.ClientSet) UpdateManager { - return &storagedUpdater{Client: cli, clientSet: clientSet} +func NewStoragedUpdater(clientSet kube.ClientSet) UpdateManager { + return &storagedUpdater{clientSet: clientSet} } func (s *storagedUpdater) Update( @@ -151,7 +149,7 @@ func (s *storagedUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) empty := len(spaces) == 0 if empty || nc.IsForceUpdateEnabled() { - return s.clientSet.Pod().DeletePod(namespace, updatePodName) + return s.clientSet.Pod().DeletePod(namespace, updatePodName, false) } updatePod, err := s.clientSet.Pod().GetPod(namespace, updatePodName) @@ -174,7 +172,7 @@ func (s *storagedUpdater) RestartPod(nc *v1alpha1.NebulaCluster, ordinal int32) host := nc.StoragedComponent().GetPodFQDN(ordinal) if s.readyToUpdate(mc, host, updatePod) { - return s.clientSet.Pod().DeletePod(namespace, updatePodName) + return s.clientSet.Pod().DeletePod(namespace, updatePodName, false) } if err := s.transLeaderIfNecessary(nc, mc, ordinal); err != nil { diff --git a/pkg/controller/nebulacluster/nebula_cluster_controller.go b/pkg/controller/nebulacluster/nebula_cluster_controller.go index ebab7efb..21bd6121 100644 --- a/pkg/controller/nebulacluster/nebula_cluster_controller.go +++ b/pkg/controller/nebulacluster/nebula_cluster_controller.go @@ -21,7 +21,7 @@ import ( "fmt" "time" - kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -59,10 +59,11 @@ func NewClusterReconciler(mgr ctrl.Manager, enableKruise bool) (*ClusterReconcil return nil, err } - sm := component.NewStorageScaler(mgr.GetClient(), clientSet) + sm := component.NewStorageScaler(clientSet) graphdUpdater := component.NewGraphdUpdater(clientSet.Pod()) metadUpdater := component.NewMetadUpdater(clientSet.Pod()) - storagedUpdater := component.NewStoragedUpdater(mgr.GetClient(), clientSet) + storagedUpdater := component.NewStoragedUpdater(clientSet) + storagedFailover := component.NewStoragedFailover(mgr.GetClient(), clientSet) dm, err := discutil.New(mgr.GetConfig()) if err != nil { @@ -73,12 +74,12 @@ func NewClusterReconciler(mgr ctrl.Manager, enableKruise bool) (*ClusterReconcil return nil, fmt.Errorf("create apiserver info failed: %v", err) } - evenPodsSpread, err := kube.EnableEvenPodsSpread(info) + valid, err := kube.ValidVersion(info) if err != nil { - return nil, fmt.Errorf("get feature failed: %v", err) + return nil, fmt.Errorf("get server version failed: %v", err) } - if !evenPodsSpread { - return nil, fmt.Errorf("EvenPodsSpread feauture not supported") + if !valid { + return nil, fmt.Errorf("server version not supported") } return &ClusterReconciler{ @@ -97,7 +98,8 @@ func NewClusterReconciler(mgr ctrl.Manager, enableKruise bool) (*ClusterReconcil clientSet, dm, sm, - storagedUpdater), + storagedUpdater, + storagedFailover), component.NewNebulaExporter(clientSet), component.NewNebulaConsole(clientSet), reclaimer.NewMetaReconciler(clientSet), @@ -162,7 +164,6 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re return ctrl.Result{}, fmt.Errorf("openkruise scheme not registered") } - // TODO: check lm license key valid if err := r.syncNebulaCluster(nebulaCluster.DeepCopy()); err != nil { isReconcileError := func(err error) (b bool) { defer func() { @@ -197,7 +198,7 @@ func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&corev1.ConfigMap{}). Owns(&corev1.Service{}). Owns(&appsv1.StatefulSet{}). - Owns(&kruisev1alpha1.StatefulSet{}). + Owns(&kruisev1beta1.StatefulSet{}). Owns(&appsv1.Deployment{}). Complete(r) } diff --git a/pkg/kube/kube.go b/pkg/kube/kube.go index 235156d9..fabcd197 100644 --- a/pkg/kube/kube.go +++ b/pkg/kube/kube.go @@ -17,13 +17,58 @@ limitations under the License. package kube import ( + "context" + "fmt" "regexp" "strconv" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -func EnableEvenPodsSpread(ver *version.Info) (bool, error) { +type FinalizerOpType string + +const ( + AddFinalizerOpType FinalizerOpType = "Add" + RemoveFinalizerOpType FinalizerOpType = "Remove" +) + +func UpdateFinalizer(ctx context.Context, c client.Client, object client.Object, op FinalizerOpType, finalizer string) error { + switch op { + case AddFinalizerOpType, RemoveFinalizerOpType: + default: + return fmt.Errorf("unsupported op type %s", op) + } + + key := client.ObjectKeyFromObject(object) + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + fetchedObject := object.DeepCopyObject().(client.Object) + if err := c.Get(ctx, key, fetchedObject); err != nil { + return err + } + finalizers := fetchedObject.GetFinalizers() + switch op { + case AddFinalizerOpType: + if controllerutil.ContainsFinalizer(fetchedObject, finalizer) { + return nil + } + finalizers = append(finalizers, finalizer) + case RemoveFinalizerOpType: + finalizerSet := sets.NewString(finalizers...) + if !finalizerSet.Has(finalizer) { + return nil + } + finalizers = finalizerSet.Delete(finalizer).List() + } + fetchedObject.SetFinalizers(finalizers) + return c.Update(ctx, fetchedObject) + }) +} + +func ValidVersion(ver *version.Info) (bool, error) { major, err := strconv.Atoi(ver.Major) if err != nil { return false, err diff --git a/pkg/kube/nebulacluster.go b/pkg/kube/nebulacluster.go index 956b1c47..23186e0c 100644 --- a/pkg/kube/nebulacluster.go +++ b/pkg/kube/nebulacluster.go @@ -19,6 +19,7 @@ package kube import ( "context" "fmt" + "reflect" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -79,16 +80,22 @@ func (c *nebulaClusterClient) UpdateNebulaCluster(nc *v1alpha1.NebulaCluster) er return retry.RetryOnConflict(retry.DefaultBackoff, func() error { // Update the set with the latest resource version for the next poll - if updated, err := c.GetNebulaCluster(ns, ncName); err == nil { - nc = updated.DeepCopy() - nc.Spec = *ncSpec - nc.SetLabels(labels) - nc.SetAnnotations(annotations) - } else { + ncClone, err := c.GetNebulaCluster(ns, ncName) + if err != nil { utilruntime.HandleError(fmt.Errorf("get NebulaCluster %s/%s failed: %v", ns, ncName, err)) return err } + if reflect.DeepEqual(ncSpec, ncClone.Spec) && + reflect.DeepEqual(labels, ncClone.Labels) && + reflect.DeepEqual(annotations, ncClone.Annotations) { + return nil + } + + nc = ncClone.DeepCopy() + nc.Spec = *ncSpec + nc.SetLabels(labels) + nc.SetAnnotations(annotations) updateErr := c.client.Update(context.TODO(), nc) if updateErr == nil { klog.Infof("NebulaCluster %s/%s updated successfully", ns, ncName) @@ -105,18 +112,21 @@ func (c *nebulaClusterClient) UpdateNebulaClusterStatus(nc *v1alpha1.NebulaClust status := nc.Status.DeepCopy() return retry.RetryOnConflict(retry.DefaultBackoff, func() error { - if updated, err := c.GetNebulaCluster(ns, ncName); err == nil { - // make a copy, so we don't mutate the shared cache - nc = updated.DeepCopy() - nc.Status = *status - } else { + ncClone, err := c.GetNebulaCluster(ns, ncName) + if err != nil { utilruntime.HandleError(fmt.Errorf("get NebulaCluster [%s/%s] failed: %v", ns, ncName, err)) return err } + if reflect.DeepEqual(status, ncClone.Status) { + return nil + } + + nc = ncClone.DeepCopy() + nc.Status = *status updateErr := c.client.Status().Update(context.TODO(), nc) if updateErr == nil { - klog.Infof("NebulaCluster [%s/%s] updated successfully", ns, ncName) + klog.Infof("NebulaCluster [%s/%s] status updated successfully", ns, ncName) return nil } klog.Errorf("update NebulaCluster [%s/%s] status failed: %v", ns, ncName, updateErr) diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index 28d805d8..deac63c1 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -36,7 +36,7 @@ type Pod interface { GetPod(namespace string, name string) (*corev1.Pod, error) CreatePod(pod *corev1.Pod) error UpdatePod(pod *corev1.Pod) error - DeletePod(namespace string, name string) error + DeletePod(namespace string, name string, nonGraceful bool) error ListPods(namespace string, selector labels.Selector) ([]corev1.Pod, error) } @@ -96,7 +96,7 @@ func (pd *podClient) UpdatePod(pod *corev1.Pod) error { }) } -func (pd *podClient) DeletePod(namespace, name string) error { +func (pd *podClient) DeletePod(namespace, name string, nonGraceful bool) error { pod := &corev1.Pod{} if err := pd.kubecli.Get(context.TODO(), types.NamespacedName{ Namespace: namespace, @@ -105,11 +105,15 @@ func (pd *podClient) DeletePod(namespace, name string) error { return err } - policy := metav1.DeletePropagationBackground - options := &client.DeleteOptions{ - PropagationPolicy: &policy, + preconditions := metav1.Preconditions{UID: &pod.UID, ResourceVersion: &pod.ResourceVersion} + deleteOptions := &client.DeleteOptions{Preconditions: &preconditions} + if nonGraceful { + gracePeriod := int64(0) + propagationPolicy := metav1.DeletePropagationBackground + deleteOptions.GracePeriodSeconds = &gracePeriod + deleteOptions.PropagationPolicy = &propagationPolicy } - return pd.kubecli.Delete(context.TODO(), pod, options) + return pd.kubecli.Delete(context.TODO(), pod, deleteOptions) } func (pd *podClient) ListPods(namespace string, selector labels.Selector) ([]corev1.Pod, error) { diff --git a/pkg/util/condition/node_condition.go b/pkg/util/condition/node_condition.go new file mode 100644 index 00000000..65229e6a --- /dev/null +++ b/pkg/util/condition/node_condition.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 Vesoft Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package condition + +import ( + corev1 "k8s.io/api/core/v1" +) + +func GetNodeTrueConditions(status *corev1.NodeStatus) []corev1.NodeCondition { + if status == nil { + return nil + } + return filterOutNodeConditionByStatus(status.Conditions, corev1.ConditionTrue) +} + +func IsNodeReadyFalseOrUnknown(status *corev1.NodeStatus) bool { + condition := getNodeReadyCondition(status) + return condition != nil && (condition.Status == corev1.ConditionFalse || condition.Status == corev1.ConditionUnknown) +} + +func getNodeReadyCondition(status *corev1.NodeStatus) *corev1.NodeCondition { + if status == nil { + return nil + } + condition := filterOutNodeConditionByType(status.Conditions, corev1.NodeReady) + return condition +} + +func filterOutNodeConditionByType(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType) *corev1.NodeCondition { + for i := range conditions { + if conditions[i].Type == conditionType { + return &conditions[i] + } + } + + return nil +} + +func filterOutNodeConditionByStatus(conditions []corev1.NodeCondition, conditionStatus corev1.ConditionStatus) []corev1.NodeCondition { + filtered := make([]corev1.NodeCondition, 0) + for i := range conditions { + if conditions[i].Status == conditionStatus { + filtered = append(filtered, conditions[i]) + } + } + return filtered +} diff --git a/pkg/util/extender/unstructured.go b/pkg/util/extender/unstructured.go index 7d8a5104..a1aeef1d 100644 --- a/pkg/util/extender/unstructured.go +++ b/pkg/util/extender/unstructured.go @@ -183,9 +183,12 @@ func PodTemplateEqual(newUnstruct, oldUnstruct *unstructured.Unstructured) bool func ObjectEqual(newUnstruct, oldUnstruct *unstructured.Unstructured) bool { annotations := map[string]string{} for k, v := range oldUnstruct.GetAnnotations() { - if k != annotation.AnnLastAppliedConfigKey { - annotations[k] = v + if k == annotation.AnnLastAppliedConfigKey || + k == annotation.AnnRestartTimestamp || + k == annotation.AnnLastReplicas { + continue } + annotations[k] = v } if !apiequality.Semantic.DeepEqual(newUnstruct.GetAnnotations(), annotations) { return false @@ -317,15 +320,15 @@ func SetUpdatePartition( if err := SetSpecField(obj, upgradeOrdinal, "updateStrategy", "rollingUpdate", "partition"); err != nil { return err } - if advanced { - if err := SetSpecField(obj, "InPlaceIfPossible", "updateStrategy", "rollingUpdate", "podUpdatePolicy"); err != nil { - return err - } - if err := SetSpecField(obj, gracePeriod, - "updateStrategy", "rollingUpdate", "inPlaceUpdateStrategy", "gracePeriodSeconds"); err != nil { - return err - } - } + //if advanced { + // if err := SetSpecField(obj, "InPlaceIfPossible", "updateStrategy", "rollingUpdate", "podUpdatePolicy"); err != nil { + // return err + // } + // if err := SetSpecField(obj, gracePeriod, + // "updateStrategy", "rollingUpdate", "inPlaceUpdateStrategy", "gracePeriodSeconds"); err != nil { + // return err + // } + //} return nil } diff --git a/pkg/util/resource/resource.go b/pkg/util/resource/resource.go index dcfe7225..72479b1b 100644 --- a/pkg/util/resource/resource.go +++ b/pkg/util/resource/resource.go @@ -17,7 +17,7 @@ limitations under the License. package resource import ( - kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1" + kruisev1beta1 "github.com/openkruise/kruise-api/apps/v1beta1" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" @@ -31,13 +31,10 @@ type GVRFunc func() schema.GroupVersionResource var ( NebulaClusterKind = v1alpha1.SchemeGroupVersion.WithKind("NebulaCluster") StatefulSetKind = appsv1.SchemeGroupVersion.WithKind("StatefulSet") - AdvancedStatefulSetKind = kruisev1alpha1.SchemeGroupVersion.WithKind("StatefulSet") - UnitedDeploymentKind = kruisev1alpha1.SchemeGroupVersion.WithKind("UnitedDeployment") + AdvancedStatefulSetKind = kruisev1beta1.SchemeGroupVersion.WithKind("StatefulSet") GroupVersionResources = map[string]GVRFunc{ - StatefulSetKind.String(): GetStatefulSetGVR, - AdvancedStatefulSetKind.String(): GetAdvancedStatefulSetGVR, - UnitedDeploymentKind.String(): GetUniteDeploymentGVR, + StatefulSetKind.String(): GetStatefulSetGVR, } ) @@ -52,7 +49,7 @@ func GetStatefulSetGVR() schema.GroupVersionResource { func GetAdvancedStatefulSetGVR() schema.GroupVersionResource { return schema.GroupVersionResource{ Group: "apps.kruise.io", - Version: "v1alpha1", + Version: "v1beta1", Resource: "statefulsets", } }