Skip to content

Commit

Permalink
enable cloneset deleting pvc when pod hanging
Browse files Browse the repository at this point in the history
Signed-off-by: willise <[email protected]>
  • Loading branch information
willise committed Dec 22, 2022
1 parent 1ece7fe commit df1f121
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 6 deletions.
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/cloneset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type CloneSetScaleStrategy struct {
// The scale will fail if the number of unavailable pods were greater than this MaxUnavailable at scaling up.
// MaxUnavailable works only when scaling up.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`

// Indicate if cloneset will reuse aleady existed pvc to
// rebuild a new pod
DisablePVCReuse bool `json:"disablePVCReuse,omitempty"`
}

// CloneSetUpdateStrategy defines strategies for pods update.
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_clonesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ spec:
description: ScaleStrategy indicates the ScaleStrategy that will be
employed to create and delete Pods in the CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse aleady existed pvc
to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ spec:
that will be employed to create and delete Pods in the
CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse aleady
existed pvc to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
115 changes: 115 additions & 0 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ import (
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/fieldindex"
"github.com/openkruise/kruise/pkg/util/lifecycle"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand All @@ -60,6 +64,19 @@ func (r *realControl) Scale(
return false, nil
}

// If cloneset doesn't want to reuse pvc, clean up
// the existing pvc first. Then it looks like the pod
// is deleted by controller, new pod can be created.
if updateCS.Spec.ScaleStrategy.DisablePVCReuse {
uselessPVCs := getUselessPVCs(pods, pvcs)
if len(uselessPVCs) > 0 {
klog.V(3).Infof("Begin to clean up cloneset %s useless PVCs", controllerKey)
if modified, err := r.cleanupPVCs(updateCS, uselessPVCs); err != nil || modified {
return modified, err
}
}
}

// 1. manage pods to delete and in preDelete
podsSpecifiedToDelete, podsInPreDelete, numToDelete := getPlannedDeletedPods(updateCS, pods)
if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, numToDelete); err != nil || modified {
Expand Down Expand Up @@ -403,3 +420,101 @@ func (r *realControl) choosePodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff in

return podsToDelete
}

func getInstanceIDsFromPods(pods []*v1.Pod) sets.String {
ins := sets.NewString()
for _, pod := range pods {
if id := clonesetutils.GetInstanceID(pod); id != "" {
ins.Insert(id)
}
}
return ins
}

func getUselessPVCs(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) map[string]*v1.PersistentVolumeClaim {
activeIDs := getInstanceIDsFromPods(pods)

useless := map[string]*v1.PersistentVolumeClaim{}
for _, pvc := range pvcs {
id := clonesetutils.GetInstanceID(pvc)
if id != "" && !activeIDs.Has(id) {
useless[id] = pvc
}
}
return useless
}

func (r *realControl) cleanupPVCs(cs *appsv1alpha1.CloneSet, uselessPVCs map[string]*v1.PersistentVolumeClaim) (bool, error) {
var modified bool

pods, err := getAllPods(r.Client, cs)
if err != nil {
klog.Errorf("Could not get cloneset %s owned pods", clonesetutils.GetControllerKey(cs))
return modified, err
}

// If useless pvc owner pod does not exist, the pvc can be deleted directly,
// else update pvc's ownerreference to pod.
for _, pod := range pods {
instanceID := clonesetutils.GetInstanceID(pod)
if pvc, ok := uselessPVCs[instanceID]; ok {
if updateClaimOwnerRefToPod(pvc, cs, pod) {
if modified, err := r.updatePVC(cs, pvc); err != nil && !errors.IsNotFound(err) {
return modified, err
}
modified = true
}
// Left pvcs will be deleted directly.
delete(uselessPVCs, instanceID)
}
}

for _, pvc := range uselessPVCs {
// It's safe to delete pvc that has no pod found.
if modified, err := r.deletePVC(cs, pvc); err != nil && !errors.IsNotFound(err) {
return modified, err
}
modified = true
}
return modified, err
}

func getAllPods(reader client.Reader, cs *appsv1alpha1.CloneSet) ([]*v1.Pod, error) {
opts := &client.ListOptions{
Namespace: cs.Namespace,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}),
}
pods, err := clonesetutils.GetAllPods(reader, opts)
if err != nil {
return nil, err
}
return pods, nil
}

func (r *realControl) updatePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) (bool, error) {
var modified bool
if err := r.Client.Update(context.TODO(), pvc); err != nil {
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update PVC %s: %v", pvc.Name, err)
return modified, err
}
return true, nil
}

func (r *realControl) deletePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) (bool, error) {
var modified bool
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := r.Delete(context.TODO(), pvc); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to clean up PVC %s: %v", pvc.Name, err)
return modified, err
}
return true, nil
}

func updateClaimOwnerRefToPod(pvc *v1.PersistentVolumeClaim, cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
needsUpdate := false
needsUpdate = util.RemoveOwnerRef(pvc, cs)
podMeta := &pod.TypeMeta
util.UpdatePodMeta(podMeta)
return util.SetOwnerRef(pvc, pod, podMeta) || needsUpdate
}
162 changes: 162 additions & 0 deletions pkg/controller/cloneset/sync/cloneset_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -527,3 +528,164 @@ func TestGetOrGenAvailableIDs(t *testing.T) {
t.Fatalf("expected got random id, but actually %v", id)
}
}

func TestDisablePVCReuse(t *testing.T) {
cs := &appsv1alpha1.CloneSet{ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "foo",
UID: "test"}}

podsToDelete := []*v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"a/b"},
Namespace: "default",
Name: "foo-id1",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id1",
"foo": "bar",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "foo",
UID: "test",
Controller: func() *bool { a := true; return &a }(),
BlockOwnerDeletion: func() *bool { a := true; return &a }(),
},
},
UID: "foo-id1",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "foo-id3",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id3",
"foo": "bar",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "foo",
UID: "test",
Controller: func() *bool { a := true; return &a }(),
BlockOwnerDeletion: func() *bool { a := true; return &a }(),
},
},
UID: "foo-id3",
},
},
}

pvcs := map[string]*v1.PersistentVolumeClaim{
"id1": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "datadir-foo-id1",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id1",
"foo": "bar",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "foo",
UID: "test",
Controller: func() *bool { a := true; return &a }(),
BlockOwnerDeletion: func() *bool { a := true; return &a }(),
},
},
},
},
"id2": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "datadir-foo-id2",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id2",
"foo": "bar",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "foo",
UID: "test",
Controller: func() *bool { a := true; return &a }(),
BlockOwnerDeletion: func() *bool { a := true; return &a }(),
},
},
},
},
"id3": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "datadir-foo-id3",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id3",
"foo": "bar",
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
Name: "foo",
UID: "test",
Controller: func() *bool { a := true; return &a }(),
BlockOwnerDeletion: func() *bool { a := true; return &a }(),
},
},
},
},
}

ctrl := newFakeControl()
for _, p := range podsToDelete {
_ = ctrl.Create(context.TODO(), p)
}
for _, p := range pvcs {
_ = ctrl.Create(context.TODO(), p)
}

for _, p := range podsToDelete {
_ = ctrl.Delete(context.TODO(), p)
}

deleted, err := ctrl.cleanupPVCs(cs, pvcs)
if err != nil {
t.Fatalf("failed to delete got pvcs: %v", err)
} else if !deleted {
gotPods := v1.PodList{}
ctrl.List(context.TODO(), &gotPods, client.InNamespace("default"))
t.Fatalf("failed to delete got pvcs: not deleted")
}

gotPods := v1.PodList{}
if err := ctrl.List(context.TODO(), &gotPods, client.InNamespace("default")); err != nil {
t.Fatalf("failed to list pods: %v", err)
}
if len(gotPods.Items) != 1 || reflect.DeepEqual(gotPods.Items[0], podsToDelete[0]) {
t.Fatalf("upexpected pods: %v", util.DumpJSON(gotPods.Items))
}

gotPVCs := v1.PersistentVolumeClaimList{}
if err := ctrl.List(context.TODO(), &gotPVCs, client.InNamespace("default")); err != nil {
t.Fatalf("failed to list pvcs: %v", err)
}
if len(gotPVCs.Items) != 1 || reflect.DeepEqual(gotPVCs.Items[0], pvcs["id1"]) {
t.Fatalf("unexpected pvcs: %v", util.DumpJSON(gotPVCs.Items))
}

ref := gotPVCs.Items[0].OwnerReferences[0]
refGV, _ := schema.ParseGroupVersion(ref.APIVersion)
if ref.UID != podsToDelete[0].UID || ref.Kind != "Pod" || refGV.Group != v1.SchemeGroupVersion.Group {
t.Fatalf("unexpected ownerReferences groud version: %v", ref)
}
}
Loading

0 comments on commit df1f121

Please sign in to comment.