Skip to content

Commit

Permalink
Support preDelete lifecycle for Advanced DaemonSet
Browse files Browse the repository at this point in the history
Signed-off-by: FillZpp <[email protected]>
  • Loading branch information
FillZpp committed Mar 9, 2022
1 parent 2953c6c commit 4b8edbf
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 88 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ We encourage contributors to follow the [PR template](./.github/PULL_REQUEST_TEM
As a contributor, if you want to make any contribution to Kruise project, we should reach an agreement on the version of tools used in the development environment.
Here are some dependents with specific version:

- Golang : v1.15+ (1.17 is best)
- Golang : v1.17+
- Kubernetes: v1.16+

### Developing guide
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ all: build
##@ Development

go_check:
@scripts/check_go_version "1.15.0"
@scripts/check_go_version "1.17.0"

generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
@scripts/generate_client.sh
Expand Down
6 changes: 6 additions & 0 deletions apis/apps/v1alpha1/daemonset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -156,6 +157,11 @@ type DaemonSetSpec struct {
// Defaults to 10.
// +optional
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`

// Lifecycle defines the lifecycle hooks for Pods pre-delete, in-place update.
// Currently, we only support pre-delete hook for Advanced DaemonSet.
// +optional
Lifecycle *appspub.Lifecycle `json:"lifecycle,omitempty"`
}

// DaemonSetStatus defines the observed state of DaemonSet
Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions config/crd/bases/apps.kruise.io_daemonsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,37 @@ spec:
description: BurstReplicas is a rate limiter for booting pods on a
lot of pods. The default value is 250
x-kubernetes-int-or-string: true
lifecycle:
description: Lifecycle defines the lifecycle hooks for Pods pre-delete,
in-place update. Currently, we only support pre-delete hook for
Advanced DaemonSet.
properties:
inPlaceUpdate:
description: InPlaceUpdate is the hook before Pod to update and
after Pod has been updated.
properties:
finalizersHandler:
items:
type: string
type: array
labelsHandler:
additionalProperties:
type: string
type: object
type: object
preDelete:
description: PreDelete is the hook before Pod to be deleted.
properties:
finalizersHandler:
items:
type: string
type: array
labelsHandler:
additionalProperties:
type: string
type: object
type: object
type: object
minReadySeconds:
description: The minimum number of seconds for which a newly created
DaemonSet pod should be ready without any of its container crashing,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/openkruise/kruise/pkg/util/updatesort"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -255,6 +256,7 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc
if res.InPlaceUpdate {
if res.UpdateErr == nil {
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place(revision %v)", pod.Name, updateRevision.Name)
clonesetutils.ResourceVersionExpectations.Expect(&metav1.ObjectMeta{UID: pod.UID, ResourceVersion: res.NewResourceVersion})
clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
}
Expand Down
150 changes: 95 additions & 55 deletions pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
"github.com/openkruise/kruise/pkg/util/requeueduration"
"github.com/openkruise/kruise/pkg/util/revisionadapter"
Expand Down Expand Up @@ -176,15 +177,16 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
crControl: kubecontroller.RealControllerRevisionControl{
KubeClient: genericClient.KubeClient,
},
expectations: kubecontroller.NewControllerExpectations(),
updateExpectations: kruiseExpectations.NewUpdateExpectations(revisionadapter.NewDefaultImpl()),
dsLister: dsLister,
historyLister: historyLister,
podLister: podLister,
nodeLister: nodeLister,
failedPodsBackoff: failedPodsBackoff,
inplaceControl: inplaceupdate.New(cli, revisionAdapter),
revisionAdapter: revisionAdapter,
lifecycleControl: lifecycle.New(cli),
expectations: kubecontroller.NewControllerExpectations(),
resourceVersionExpectations: kruiseExpectations.NewResourceVersionExpectation(),
dsLister: dsLister,
historyLister: historyLister,
podLister: podLister,
nodeLister: nodeLister,
failedPodsBackoff: failedPodsBackoff,
inplaceControl: inplaceupdate.New(cli, revisionAdapter),
revisionAdapter: revisionAdapter,
}
return dsc, err
}
Expand Down Expand Up @@ -213,7 +215,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
newDS := e.ObjectNew.(*appsv1alpha1.DaemonSet)
if oldDS.UID != newDS.UID {
dsc.expectations.DeleteExpectations(keyFunc(oldDS))
dsc.updateExpectations.DeleteExpectations(keyFunc(oldDS))
}
klog.V(4).Infof("Updating DaemonSet %s/%s", newDS.Namespace, newDS.Name)
return true
Expand All @@ -222,7 +223,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
ds := e.Object.(*appsv1alpha1.DaemonSet)
klog.V(4).Infof("Deleting DaemonSet %s/%s", ds.Namespace, ds.Name)
dsc.expectations.DeleteExpectations(keyFunc(ds))
dsc.updateExpectations.DeleteExpectations(keyFunc(ds))
return true
},
})
Expand Down Expand Up @@ -252,16 +252,17 @@ var _ reconcile.Reconciler = &ReconcileDaemonSet{}

// ReconcileDaemonSet reconciles a DaemonSet object
type ReconcileDaemonSet struct {
kubeClient clientset.Interface
kruiseClient kruiseclientset.Interface
eventRecorder record.EventRecorder
podControl kubecontroller.PodControlInterface
crControl kubecontroller.ControllerRevisionControlInterface
kubeClient clientset.Interface
kruiseClient kruiseclientset.Interface
eventRecorder record.EventRecorder
podControl kubecontroller.PodControlInterface
crControl kubecontroller.ControllerRevisionControlInterface
lifecycleControl lifecycle.Interface

// A TTLCache of pod creates/deletes each ds expects to see
expectations kubecontroller.ControllerExpectationsInterface
// A cache of pod revisions for in-place update
updateExpectations kruiseExpectations.UpdateExpectations
// A cache of pod resourceVersion expecatations
resourceVersionExpectations kruiseExpectations.ResourceVersionExpectation

// dsLister can list/get daemonsets from the shared informer's store
dsLister kruiseappslisters.DaemonSetLister
Expand Down Expand Up @@ -343,7 +344,6 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error {
if errors.IsNotFound(err) {
klog.V(4).Infof("DaemonSet has been deleted %s", dsKey)
dsc.expectations.DeleteExpectations(dsKey)
dsc.updateExpectations.DeleteExpectations(dsKey)
return nil
}
return fmt.Errorf("unable to retrieve DaemonSet %s from store: %v", dsKey, err)
Expand Down Expand Up @@ -374,26 +374,27 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error {
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

if !dsc.expectations.SatisfiedExpectations(dsKey) {
if !dsc.expectations.SatisfiedExpectations(dsKey) || !dsc.hasPodExpectationsSatisfied(ds) {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}

err = dsc.manage(ds, nodeList, hash)
if err != nil {
return err
}

// return and wait next reconcile if expectation changed to unsatisfied
if !dsc.expectations.SatisfiedExpectations(dsKey) {
if !dsc.expectations.SatisfiedExpectations(dsKey) || !dsc.hasPodExpectationsSatisfied(ds) {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}

updateSatisfied, err := dsc.refreshUpdateStatesAndGetSatisfied(ds, hash)
if err != nil {
if err := dsc.refreshUpdateStates(ds); err != nil {
return err
}

// Process rolling updates if we're ready. For all kinds of update should not be executed if the update
// expectation is not satisfied.
if updateSatisfied && !isDaemonSetPaused(ds) {
if !isDaemonSetPaused(ds) {
switch ds.Spec.UpdateStrategy.Type {
case appsv1alpha1.OnDeleteDaemonSetStrategyType:
case appsv1alpha1.RollingUpdateDaemonSetStrategyType:
Expand Down Expand Up @@ -611,6 +612,14 @@ func (dsc *ReconcileDaemonSet) manage(ds *appsv1alpha1.DaemonSet, nodeList []*co
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with erros if any
func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
if ds.Spec.Lifecycle != nil && ds.Spec.Lifecycle.PreDelete != nil {
var err error
podsToDelete, err = dsc.syncWithPreparingDelete(ds, podsToDelete)
if err != nil {
return err
}
}

dsKey := keyFunc(ds)
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
Expand Down Expand Up @@ -737,6 +746,28 @@ func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelet
return utilerrors.NewAggregate(errors)
}

func (dsc *ReconcileDaemonSet) syncWithPreparingDelete(ds *appsv1alpha1.DaemonSet, podsToDelete []string) (podsCanDelete []string, err error) {
for _, podName := range podsToDelete {
pod, err := dsc.podLister.Pods(ds.Namespace).Get(podName)
if errors.IsNotFound(err) {
continue
} else if err != nil {
return nil, err
}
if !lifecycle.IsPodHooked(ds.Spec.Lifecycle.PreDelete, pod) {
podsCanDelete = append(podsCanDelete, podName)
continue
}
if updated, gotPod, err := dsc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete); err != nil {
return nil, err
} else if updated {
klog.V(3).Infof("DaemonSet %s/%s has marked Pod %s as PreparingDelete", ds.Namespace, ds.Name, podName)
dsc.resourceVersionExpectations.Expect(gotPod)
}
}
return
}

// podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
// - nodesNeedingDaemonPods: the pods need to start on the node
// - podsToDelete: the Pods need to be deleted on the node
Expand Down Expand Up @@ -785,6 +816,9 @@ func (dsc *ReconcileDaemonSet) podsShouldBeOnNode(
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, corev1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
} else if isPodPreDeleting(pod) {
klog.V(3).Infof("Found daemon pod %s/%s on node %s is in PreparingDelete state, will try to kill it", pod.Namespace, pod.Name, node.Name)
podsToDelete = append(podsToDelete, pod.Name)
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
Expand Down Expand Up @@ -949,46 +983,52 @@ func (dsc *ReconcileDaemonSet) cleanupHistory(ds *appsv1alpha1.DaemonSet, old []
return nil
}

func (dsc *ReconcileDaemonSet) refreshUpdateStatesAndGetSatisfied(ds *appsv1alpha1.DaemonSet, hash string) (bool, error) {
func (dsc *ReconcileDaemonSet) refreshUpdateStates(ds *appsv1alpha1.DaemonSet) error {
dsKey := keyFunc(ds)
opts := &inplaceupdate.UpdateOptions{}
opts = inplaceupdate.SetOptionsDefaults(opts)

nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
pods, err := dsc.getDaemonPods(ds)
if err != nil {
return false, fmt.Errorf("couldn't get node to daemon pod mapping for DaemonSet %q: %v", ds.Name, err)
return err
}

for _, pods := range nodeToDaemonPods {
for _, pod := range pods {
dsc.updateExpectations.ObserveUpdated(dsKey, hash, pod)
opts := &inplaceupdate.UpdateOptions{}
opts = inplaceupdate.SetOptionsDefaults(opts)

for _, pod := range pods {
if dsc.inplaceControl == nil {
continue
}
res := dsc.inplaceControl.Refresh(pod, opts)
if res.RefreshErr != nil {
klog.Errorf("DaemonSet %s/%s failed to update pod %s condition for inplace: %v", ds.Namespace, ds.Name, pod.Name, res.RefreshErr)
return res.RefreshErr
}
if res.DelayDuration != 0 {
durationStore.Push(dsKey, res.DelayDuration)
}
}

for _, pods := range nodeToDaemonPods {
for _, pod := range pods {
if dsc.inplaceControl == nil {
continue
}
res := dsc.inplaceControl.Refresh(pod, opts)
if res.RefreshErr != nil {
klog.Errorf("DaemonSet %s/%s failed to update pod %s condition for inplace: %v", ds.Namespace, ds.Name, pod.Name, res.RefreshErr)
return false, res.RefreshErr
}
if res.DelayDuration != 0 {
durationStore.Push(dsKey, res.DelayDuration)
}
}
return nil
}

func (dsc *ReconcileDaemonSet) hasPodExpectationsSatisfied(ds *appsv1alpha1.DaemonSet) bool {
dsKey := keyFunc(ds)
pods, err := dsc.getDaemonPods(ds)
if err != nil {
klog.Errorf("Failed to get pods for DaemonSet")
return false
}

updateSatisfied, unsatisfiedDuration, updateDirtyPods := dsc.updateExpectations.SatisfiedExpectations(dsKey, hash)
if !updateSatisfied {
if unsatisfiedDuration >= kruiseExpectations.ExpectationTimeout {
klog.Warningf("Expectation unsatisfied overtime for %v, updateDirtyPods=%v, timeout=%v", dsKey, updateDirtyPods, unsatisfiedDuration)
} else {
klog.V(5).Infof("Not satisfied update for %v, updateDirtyPods=%v", dsKey, updateDirtyPods)
durationStore.Push(dsKey, kruiseExpectations.ExpectationTimeout-unsatisfiedDuration)
for _, pod := range pods {
dsc.resourceVersionExpectations.Observe(pod)
if isSatisfied, unsatisfiedDuration := dsc.resourceVersionExpectations.IsSatisfied(pod); !isSatisfied {
if unsatisfiedDuration >= kruiseExpectations.ExpectationTimeout {
klog.Errorf("Expectation unsatisfied resourceVersion overtime for %v, wait for pod %v updating, timeout=%v", dsKey, pod.Name, unsatisfiedDuration)
} else {
klog.V(5).Infof("Not satisfied resourceVersion for %v, wait for pod %v updating", dsKey, pod.Name)
durationStore.Push(dsKey, kruiseExpectations.ExpectationTimeout-unsatisfiedDuration)
}
return false
}
}
return updateSatisfied, nil
return true
}
17 changes: 9 additions & 8 deletions pkg/controller/daemonset/daemonset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
kruiseinformers "github.com/openkruise/kruise/pkg/client/informers/externalversions"
kruiseappsinformers "github.com/openkruise/kruise/pkg/client/informers/externalversions/apps/v1alpha1"
kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/revisionadapter"
"github.com/openkruise/kruise/pkg/util/lifecycle"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -255,13 +255,14 @@ func NewDaemonSetController(
crControl: kubecontroller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
expectations: kubecontroller.NewControllerExpectations(),
updateExpectations: kruiseExpectations.NewUpdateExpectations(revisionadapter.NewDefaultImpl()),
dsLister: dsInformer.Lister(),
historyLister: revInformer.Lister(),
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
failedPodsBackoff: failedPodsBackoff,
lifecycleControl: lifecycle.NewForInformer(podInformer),
expectations: kubecontroller.NewControllerExpectations(),
resourceVersionExpectations: kruiseExpectations.NewResourceVersionExpectation(),
dsLister: dsInformer.Lister(),
historyLister: revInformer.Lister(),
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
failedPodsBackoff: failedPodsBackoff,
}
}

Expand Down
Loading

0 comments on commit 4b8edbf

Please sign in to comment.