From 832f1f2701cd30fdb035355bd0d51b8a90a5ea92 Mon Sep 17 00:00:00 2001 From: "kevin.qiao" Date: Fri, 17 Nov 2023 16:41:46 +0800 Subject: [PATCH] support component topology constraints (#400) --- apis/apps/v1alpha1/nebulacluster_common.go | 2 +- apis/apps/v1alpha1/nebulacluster_component.go | 30 ++++++++---- apis/apps/v1alpha1/nebulacluster_types.go | 3 ++ apis/apps/v1alpha1/zz_generated.deepcopy.go | 5 ++ .../nebula-operator/crds/nebulaclusters.yaml | 48 +++++++++++++++++++ .../apps.nebula-graph.io_nebulaclusters.yaml | 48 +++++++++++++++++++ pkg/controller/component/storaged_failover.go | 4 +- 7 files changed, 129 insertions(+), 11 deletions(-) diff --git a/apis/apps/v1alpha1/nebulacluster_common.go b/apis/apps/v1alpha1/nebulacluster_common.go index 33abfb10..e4ba9bcc 100644 --- a/apis/apps/v1alpha1/nebulacluster_common.go +++ b/apis/apps/v1alpha1/nebulacluster_common.go @@ -632,7 +632,7 @@ func generateStatefulSet(c NebulaClusterComponent, cm *corev1.ConfigMap) (*appsv ServiceAccountName: NebulaServiceAccountName, } - podSpec.TopologySpreadConstraints = getTopologySpreadConstraints(nc.Spec.TopologySpreadConstraints, componentLabel) + podSpec.TopologySpreadConstraints = c.ComponentSpec().TopologySpreadConstraints(componentLabel) volumeClaim, err := c.GenerateVolumeClaim() if err != nil { diff --git a/apis/apps/v1alpha1/nebulacluster_component.go b/apis/apps/v1alpha1/nebulacluster_component.go index 6b3266fb..a844158e 100644 --- a/apis/apps/v1alpha1/nebulacluster_component.go +++ b/apis/apps/v1alpha1/nebulacluster_component.go @@ -37,6 +37,7 @@ type ComponentAccessor interface { NodeSelector() map[string]string Affinity() *corev1.Affinity Tolerations() []corev1.Toleration + TopologySpreadConstraints(labels map[string]string) []corev1.TopologySpreadConstraint SecurityContext() *corev1.SecurityContext InitContainers() []corev1.Container SidecarContainers() []corev1.Container @@ -50,10 +51,11 @@ var _ ComponentAccessor = &componentAccessor{} // +k8s:deepcopy-gen=false type componentAccessor struct { - nodeSelector map[string]string - affinity *corev1.Affinity - tolerations []corev1.Toleration - componentSpec *ComponentSpec + nodeSelector map[string]string + affinity *corev1.Affinity + tolerations []corev1.Toleration + topologySpreadConstraints []TopologySpreadConstraint + componentSpec *ComponentSpec } func (a *componentAccessor) Replicas() int32 { @@ -125,6 +127,17 @@ func (a *componentAccessor) Tolerations() []corev1.Toleration { return a.componentSpec.Tolerations } +func (a *componentAccessor) TopologySpreadConstraints(labels map[string]string) []corev1.TopologySpreadConstraint { + tscs := a.topologySpreadConstraints + if a.componentSpec != nil && len(a.componentSpec.TopologySpreadConstraints) > 0 { + tscs = a.componentSpec.TopologySpreadConstraints + } + if len(tscs) == 0 { + return nil + } + return getTopologySpreadConstraints(tscs, labels) +} + func (a *componentAccessor) SecurityContext() *corev1.SecurityContext { if a.componentSpec == nil { return nil @@ -301,9 +314,10 @@ func (c *baseComponent) GenerateOwnerReferences() []metav1.OwnerReference { func buildComponentAccessor(nc *NebulaCluster, componentSpec *ComponentSpec) ComponentAccessor { return &componentAccessor{ - nodeSelector: nc.Spec.NodeSelector, - affinity: nc.Spec.Affinity, - tolerations: nc.Spec.Tolerations, - componentSpec: componentSpec, + nodeSelector: nc.Spec.NodeSelector, + affinity: nc.Spec.Affinity, + tolerations: nc.Spec.Tolerations, + topologySpreadConstraints: nc.Spec.TopologySpreadConstraints, + componentSpec: componentSpec, } } diff --git a/apis/apps/v1alpha1/nebulacluster_types.go b/apis/apps/v1alpha1/nebulacluster_types.go index 2623e733..710a949b 100644 --- a/apis/apps/v1alpha1/nebulacluster_types.go +++ b/apis/apps/v1alpha1/nebulacluster_types.go @@ -489,6 +489,9 @@ type ComponentSpec struct { // +optional Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + // +optional + TopologySpreadConstraints []TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty"` + // +optional SecurityContext *corev1.SecurityContext `json:"securityContext,omitempty"` diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 1c08e2f2..5d3731d7 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -133,6 +133,11 @@ func (in *ComponentSpec) DeepCopyInto(out *ComponentSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TopologySpreadConstraints != nil { + in, out := &in.TopologySpreadConstraints, &out.TopologySpreadConstraints + *out = make([]TopologySpreadConstraint, len(*in)) + copy(*out, *in) + } if in.SecurityContext != nil { in, out := &in.SecurityContext, &out.SecurityContext *out = new(v1.SecurityContext) diff --git a/charts/nebula-operator/crds/nebulaclusters.yaml b/charts/nebula-operator/crds/nebulaclusters.yaml index b1b292a5..61d95512 100644 --- a/charts/nebula-operator/crds/nebulaclusters.yaml +++ b/charts/nebula-operator/crds/nebulaclusters.yaml @@ -2449,6 +2449,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string @@ -5199,6 +5211,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string @@ -8008,6 +8032,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string @@ -10844,6 +10880,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string diff --git a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml index b1b292a5..61d95512 100644 --- a/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml +++ b/config/crd/bases/apps.nebula-graph.io_nebulaclusters.yaml @@ -2449,6 +2449,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string @@ -5199,6 +5211,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string @@ -8008,6 +8032,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string @@ -10844,6 +10880,18 @@ spec: type: string type: object type: array + topologySpreadConstraints: + items: + properties: + topologyKey: + type: string + whenUnsatisfiable: + type: string + required: + - topologyKey + - whenUnsatisfiable + type: object + type: array version: default: latest type: string diff --git a/pkg/controller/component/storaged_failover.go b/pkg/controller/component/storaged_failover.go index 4ba83f03..16a3e799 100644 --- a/pkg/controller/component/storaged_failover.go +++ b/pkg/controller/component/storaged_failover.go @@ -290,12 +290,12 @@ func (s *storagedFailover) checkPendingPod(nc *v1alpha1.NebulaCluster) error { } } } - if isPodConditionScheduledTrue(pod.Status.Conditions) && isPodPending(pod) { + if isPodConditionScheduledTrue(pod.Status.Conditions) && isPodPending(pod) && time.Now().After(pod.CreationTimestamp.Add(time.Minute*1)) { klog.Infof("storagd pod [%s/%s] conditions %v", nc.Namespace, podName, pod.Status.Conditions) if err := s.clientSet.Pod().DeletePod(nc.Namespace, podName, true); err != nil { return err } - return utilerrors.ReconcileErrorf("waiting for pending storaged pod [%s/%s] deleted", nc.Namespace, podName) + return utilerrors.ReconcileErrorf("pending storaged pod [%s/%s] deleted, reschedule", nc.Namespace, podName) } } return nil