Skip to content

Commit

Permalink
enable cloneset deleting pvc when pod hanging
Browse files Browse the repository at this point in the history
  • Loading branch information
sunshuai09 committed Nov 7, 2022
1 parent a639a6f commit 516cda9
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 6 deletions.
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/cloneset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type CloneSetScaleStrategy struct {
// The scale will fail if the number of unavailable pods were greater than this MaxUnavailable at scaling up.
// MaxUnavailable works only when scaling up.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`

// Indicate if cloneset will reuse aleady existed pvc to
// rebuild a new pod
DisablePVCReuse bool `json:"disablePVCReuse,omitempty"`
}

// CloneSetUpdateStrategy defines strategies for pods update.
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_clonesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ spec:
description: ScaleStrategy indicates the ScaleStrategy that will be
employed to create and delete Pods in the CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse aleady existed pvc
to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,10 @@ spec:
that will be employed to create and delete Pods in the
CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse aleady
existed pvc to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
185 changes: 185 additions & 0 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ import (
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/fieldindex"
"github.com/openkruise/kruise/pkg/util/lifecycle"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -60,6 +65,17 @@ func (r *realControl) Scale(
return false, nil
}

// If cloneset doesn't want to reuse pvc, clean up
// the existing pvc first. Then it looks like the pod
// is deleted by controller, new pod can be created.
if updateCS.Spec.ScaleStrategy.DisablePVCReuse {
uselessPVCs, _ := splitPVCsByInstaceIDs(pods, pvcs)
if len(uselessPVCs) > 0 {
klog.V(3).Infof("Begin to clean up cloneset %s useless PVCs", controllerKey)
return r.cleanupPVCs(updateCS, uselessPVCs)
}
}

// 1. manage pods to delete and in preDelete
podsSpecifiedToDelete, podsInPreDelete, numToDelete := getPlannedDeletedPods(updateCS, pods)
if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, numToDelete); err != nil || modified {
Expand Down Expand Up @@ -403,3 +419,172 @@ func (r *realControl) choosePodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff in

return podsToDelete
}

func getInstanceIDsFromPods(pods []*v1.Pod) sets.String {
ins := sets.NewString()
for _, pod := range pods {
ins.Insert(pod.Labels[appsv1alpha1.CloneSetInstanceID])
}
return ins
}

func splitPVCsByInstaceIDs(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (useless, using []*v1.PersistentVolumeClaim) {
activeIds := getInstanceIDsFromPods(pods)

uselessMap := map[types.UID]*v1.PersistentVolumeClaim{}
usingMap := map[types.UID]*v1.PersistentVolumeClaim{}
for _, pvc := range pvcs {
if activeIds.Has(pvc.Labels[appsv1alpha1.CloneSetInstanceID]) {
usingMap[pvc.UID] = pvc
} else {
uselessMap[pvc.UID] = pvc
}
}

for _, p := range uselessMap {
useless = append(useless, p)
}
for _, p := range usingMap {
using = append(using, p)
}
return useless, using
}

func (r *realControl) cleanupPVCs(cs *appsv1alpha1.CloneSet, uselessPVCs []*v1.PersistentVolumeClaim) (bool, error) {
var modified bool

pods, err := getInactivePods(r.Client, cs)
if err != nil {
klog.Errorf("Could not get cloneset %s owned inactive pods", clonesetutils.GetControllerKey(cs))
return modified, err
}

// There are two scenarios to clean up the pvc:
// 1. If pvc belongs to pod in terminating state, set ownerReference of the
// pod to the pvc instead of the cloneset.
// 2. if pvc belongs to already cleaned up pods or in Succeeded/Failed state,
// just delete the pvc.
for _, pvc := range uselessPVCs {
isTerminating := false

for _, pod := range pods {
if clonesetutils.IsPVCAndPodRelated(pvc, pod) {
if clonesetutils.IsPodTerminating(pod) {
isTerminating = true

if updateClaimOwnerRefToPod(pvc, cs, pod) {
if modified, err := r.updatePVC(cs, pvc); err != nil {
return modified, err
}
}
}
// One pvc can only belongs to one pod, so it's ok to skip left pods.
break
}
}

if !isTerminating {
// It's safe to delete pvc that has no pod found or pod is in
// Succeeded/Failed state.
if modified, err := r.deletePVC(cs, pvc); err != nil {
return modified, err
}
}
}
return modified, err
}

func removeOwnerRef(target, owner metav1.Object) bool {
if !hasOwnerRef(target, owner) {
return false
}
ownerUID := owner.GetUID()
oldRefs := target.GetOwnerReferences()
newRefs := make([]metav1.OwnerReference, len(oldRefs)-1)
skip := 0
for i := range oldRefs {
if oldRefs[i].UID == ownerUID {
skip = -1
} else {
newRefs[i+skip] = oldRefs[i]
}
}
target.SetOwnerReferences(newRefs)
return true
}

func hasOwnerRef(target, owner metav1.Object) bool {
ownerUID := owner.GetUID()
for _, ownerRef := range target.GetOwnerReferences() {
if ownerRef.UID == ownerUID {
return true
}
}
return false
}

func setOwnerRef(target, owner metav1.Object, ownerType *metav1.TypeMeta) bool {
if hasOwnerRef(target, owner) {
return false
}
ownerRefs := append(
target.GetOwnerReferences(),
metav1.OwnerReference{
APIVersion: ownerType.APIVersion,
Kind: ownerType.Kind,
Name: owner.GetName(),
UID: owner.GetUID(),
})
target.SetOwnerReferences(ownerRefs)
return true
}

func getInactivePods(reader client.Reader, cs *appsv1alpha1.CloneSet) ([]*v1.Pod, error) {
opts := &client.ListOptions{
Namespace: cs.Namespace,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}),
}
// Including pods in terminating state
pods, err := clonesetutils.GetInactivePods(reader, opts)
if err != nil {
return nil, err
}
return pods, nil
}

func (r *realControl) updatePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) (bool, error) {
var modified bool
if err := r.Client.Update(context.TODO(), pvc); err != nil {
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update PVC %s: %v", pvc.Name, err)
return modified, err
}
return true, nil
}

func (r *realControl) deletePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) (bool, error) {
var modified bool
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := r.Delete(context.TODO(), pvc); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to clean up PVC %s: %v", pvc.Name, err)
return modified, err
}
return true, nil
}

func updateClaimOwnerRefToPod(pvc *v1.PersistentVolumeClaim, cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
needsUpdate := false
updateMeta := func(tm *metav1.TypeMeta) {
if tm.APIVersion == "" {
tm.APIVersion = "v1"
}
if tm.Kind == "" {
tm.Kind = "kind"
}
}

needsUpdate = removeOwnerRef(pvc, cs)
podMeta := &pod.TypeMeta
updateMeta(podMeta)
return setOwnerRef(pvc, pod, podMeta) || needsUpdate
}
59 changes: 53 additions & 6 deletions pkg/controller/cloneset/utils/cloneset_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/requeueduration"
Expand Down Expand Up @@ -93,22 +92,53 @@ func GetControllerKey(cs *appsv1alpha1.CloneSet) string {

// GetActivePods returns all active pods in this namespace.
func GetActivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList := &v1.PodList{}
if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil {
podList, err := GetAllPods(reader, opts)
if err != nil {
return nil, err
}

// Ignore inactive pods
var activePods []*v1.Pod
for i, pod := range podList.Items {
for i, pod := range podList {
// Consider all rebuild pod as active pod, should not recreate
if kubecontroller.IsPodActive(&pod) {
activePods = append(activePods, &podList.Items[i])
if kubecontroller.IsPodActive(pod) {
activePods = append(activePods, podList[i])
}
}
return activePods, nil
}

// GetAllPods returns all pods in this namespace.
func GetInactivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList, err := GetAllPods(reader, opts)
if err != nil {
return nil, err
}

var inactivePods []*v1.Pod
for i, pod := range podList {
// Consider all rebuild pod as active pod, should not recreate
if !kubecontroller.IsPodActive(pod) {
inactivePods = append(inactivePods, podList[i])
}
}
return inactivePods, nil
}

// GetAllPods returns all pods in this namespace.
func GetAllPods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList := &v1.PodList{}
if err := reader.List(context.TODO(), podList, opts); err != nil {
return nil, err
}

var pods []*v1.Pod
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
return pods, nil
}

// NextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
Expand Down Expand Up @@ -166,6 +196,14 @@ func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) {
pod.Spec.Volumes = newVolumes
}

// IsPodTerminating judges if pod is in terminating state.
func IsPodTerminating(pod *v1.Pod) bool {
if pod.DeletionTimestamp != nil {
return true
}
return false
}

// GetPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The
// returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined
// by getPersistentVolumeClaimName.
Expand Down Expand Up @@ -233,3 +271,12 @@ func DoItSlowly(count int, initialBatchSize int, fn func() error) (int, error) {
}
return successes, nil
}

func IsPVCAndPodRelated(pvc *v1.PersistentVolumeClaim, pod *v1.Pod) bool {
pvcIns := pvc.Labels[appsv1alpha1.CloneSetInstanceID]
podIns := pod.Labels[appsv1alpha1.CloneSetInstanceID]
if pvcIns == "" || podIns == "" {
return false
}
return pvcIns == podIns
}

0 comments on commit 516cda9

Please sign in to comment.