Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support preDelete lifecycle for Advanced DaemonSet #923

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ We encourage contributors to follow the [PR template](./.github/PULL_REQUEST_TEM
As a contributor, if you want to make any contribution to Kruise project, we should reach an agreement on the version of tools used in the development environment.
Here are some dependents with specific version:

- Golang : v1.15+ (1.17 is best)
- Golang : v1.17+
- Kubernetes: v1.16+

### Developing guide
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ all: build
##@ Development

go_check:
@scripts/check_go_version "1.15.0"
@scripts/check_go_version "1.17.0"

generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations.
@scripts/generate_client.sh
Expand Down
6 changes: 6 additions & 0 deletions apis/apps/v1alpha1/daemonset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -156,6 +157,11 @@ type DaemonSetSpec struct {
// Defaults to 10.
// +optional
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`

// Lifecycle defines the lifecycle hooks for Pods pre-delete, in-place update.
// Currently, we only support pre-delete hook for Advanced DaemonSet.
// +optional
Lifecycle *appspub.Lifecycle `json:"lifecycle,omitempty"`
}

// DaemonSetStatus defines the observed state of DaemonSet
Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions config/crd/bases/apps.kruise.io_daemonsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,37 @@ spec:
description: BurstReplicas is a rate limiter for booting pods on a
lot of pods. The default value is 250
x-kubernetes-int-or-string: true
lifecycle:
description: Lifecycle defines the lifecycle hooks for Pods pre-delete,
in-place update. Currently, we only support pre-delete hook for
Advanced DaemonSet.
properties:
inPlaceUpdate:
description: InPlaceUpdate is the hook before Pod to update and
after Pod has been updated.
properties:
finalizersHandler:
items:
type: string
type: array
labelsHandler:
additionalProperties:
type: string
type: object
type: object
preDelete:
description: PreDelete is the hook before Pod to be deleted.
properties:
finalizersHandler:
items:
type: string
type: array
labelsHandler:
additionalProperties:
type: string
type: object
type: object
type: object
minReadySeconds:
description: The minimum number of seconds for which a newly created
DaemonSet pod should be ready without any of its container crashing,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/openkruise/kruise/pkg/util/updatesort"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -255,6 +256,7 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc
if res.InPlaceUpdate {
if res.UpdateErr == nil {
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place(revision %v)", pod.Name, updateRevision.Name)
clonesetutils.ResourceVersionExpectations.Expect(&metav1.ObjectMeta{UID: pod.UID, ResourceVersion: res.NewResourceVersion})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need UpdateExpectations here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I will remove it in next PR.

clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
}
Expand Down
150 changes: 95 additions & 55 deletions pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
utildiscovery "github.com/openkruise/kruise/pkg/util/discovery"
kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/ratelimiter"
"github.com/openkruise/kruise/pkg/util/requeueduration"
"github.com/openkruise/kruise/pkg/util/revisionadapter"
Expand Down Expand Up @@ -176,15 +177,16 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
crControl: kubecontroller.RealControllerRevisionControl{
KubeClient: genericClient.KubeClient,
},
expectations: kubecontroller.NewControllerExpectations(),
updateExpectations: kruiseExpectations.NewUpdateExpectations(revisionadapter.NewDefaultImpl()),
dsLister: dsLister,
historyLister: historyLister,
podLister: podLister,
nodeLister: nodeLister,
failedPodsBackoff: failedPodsBackoff,
inplaceControl: inplaceupdate.New(cli, revisionAdapter),
revisionAdapter: revisionAdapter,
lifecycleControl: lifecycle.New(cli),
expectations: kubecontroller.NewControllerExpectations(),
resourceVersionExpectations: kruiseExpectations.NewResourceVersionExpectation(),
dsLister: dsLister,
historyLister: historyLister,
podLister: podLister,
nodeLister: nodeLister,
failedPodsBackoff: failedPodsBackoff,
inplaceControl: inplaceupdate.New(cli, revisionAdapter),
revisionAdapter: revisionAdapter,
}
return dsc, err
}
Expand Down Expand Up @@ -213,7 +215,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
newDS := e.ObjectNew.(*appsv1alpha1.DaemonSet)
if oldDS.UID != newDS.UID {
dsc.expectations.DeleteExpectations(keyFunc(oldDS))
dsc.updateExpectations.DeleteExpectations(keyFunc(oldDS))
}
klog.V(4).Infof("Updating DaemonSet %s/%s", newDS.Namespace, newDS.Name)
return true
Expand All @@ -222,7 +223,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
ds := e.Object.(*appsv1alpha1.DaemonSet)
klog.V(4).Infof("Deleting DaemonSet %s/%s", ds.Namespace, ds.Name)
dsc.expectations.DeleteExpectations(keyFunc(ds))
dsc.updateExpectations.DeleteExpectations(keyFunc(ds))
return true
},
})
Expand Down Expand Up @@ -252,16 +252,17 @@ var _ reconcile.Reconciler = &ReconcileDaemonSet{}

// ReconcileDaemonSet reconciles a DaemonSet object
type ReconcileDaemonSet struct {
kubeClient clientset.Interface
kruiseClient kruiseclientset.Interface
eventRecorder record.EventRecorder
podControl kubecontroller.PodControlInterface
crControl kubecontroller.ControllerRevisionControlInterface
kubeClient clientset.Interface
kruiseClient kruiseclientset.Interface
eventRecorder record.EventRecorder
podControl kubecontroller.PodControlInterface
crControl kubecontroller.ControllerRevisionControlInterface
lifecycleControl lifecycle.Interface

// A TTLCache of pod creates/deletes each ds expects to see
expectations kubecontroller.ControllerExpectationsInterface
// A cache of pod revisions for in-place update
updateExpectations kruiseExpectations.UpdateExpectations
// A cache of pod resourceVersion expecatations
resourceVersionExpectations kruiseExpectations.ResourceVersionExpectation

// dsLister can list/get daemonsets from the shared informer's store
dsLister kruiseappslisters.DaemonSetLister
Expand Down Expand Up @@ -343,7 +344,6 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error {
if errors.IsNotFound(err) {
klog.V(4).Infof("DaemonSet has been deleted %s", dsKey)
dsc.expectations.DeleteExpectations(dsKey)
dsc.updateExpectations.DeleteExpectations(dsKey)
return nil
}
return fmt.Errorf("unable to retrieve DaemonSet %s from store: %v", dsKey, err)
Expand Down Expand Up @@ -374,26 +374,27 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error {
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]

if !dsc.expectations.SatisfiedExpectations(dsKey) {
if !dsc.expectations.SatisfiedExpectations(dsKey) || !dsc.hasPodExpectationsSatisfied(ds) {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}

err = dsc.manage(ds, nodeList, hash)
if err != nil {
return err
}

// return and wait next reconcile if expectation changed to unsatisfied
if !dsc.expectations.SatisfiedExpectations(dsKey) {
if !dsc.expectations.SatisfiedExpectations(dsKey) || !dsc.hasPodExpectationsSatisfied(ds) {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}

updateSatisfied, err := dsc.refreshUpdateStatesAndGetSatisfied(ds, hash)
if err != nil {
if err := dsc.refreshUpdateStates(ds); err != nil {
return err
}

// Process rolling updates if we're ready. For all kinds of update should not be executed if the update
// expectation is not satisfied.
if updateSatisfied && !isDaemonSetPaused(ds) {
if !isDaemonSetPaused(ds) {
switch ds.Spec.UpdateStrategy.Type {
case appsv1alpha1.OnDeleteDaemonSetStrategyType:
case appsv1alpha1.RollingUpdateDaemonSetStrategyType:
Expand Down Expand Up @@ -611,6 +612,14 @@ func (dsc *ReconcileDaemonSet) manage(ds *appsv1alpha1.DaemonSet, nodeList []*co
// syncNodes deletes given pods and creates new daemon set pods on the given nodes
// returns slice with erros if any
func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
if ds.Spec.Lifecycle != nil && ds.Spec.Lifecycle.PreDelete != nil {
var err error
podsToDelete, err = dsc.syncWithPreparingDelete(ds, podsToDelete)
if err != nil {
return err
}
}

dsKey := keyFunc(ds)
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
Expand Down Expand Up @@ -737,6 +746,28 @@ func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelet
return utilerrors.NewAggregate(errors)
}

func (dsc *ReconcileDaemonSet) syncWithPreparingDelete(ds *appsv1alpha1.DaemonSet, podsToDelete []string) (podsCanDelete []string, err error) {
for _, podName := range podsToDelete {
pod, err := dsc.podLister.Pods(ds.Namespace).Get(podName)
if errors.IsNotFound(err) {
continue
} else if err != nil {
return nil, err
}
if !lifecycle.IsPodHooked(ds.Spec.Lifecycle.PreDelete, pod) {
podsCanDelete = append(podsCanDelete, podName)
continue
}
if updated, gotPod, err := dsc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete); err != nil {
return nil, err
} else if updated {
klog.V(3).Infof("DaemonSet %s/%s has marked Pod %s as PreparingDelete", ds.Namespace, ds.Name, podName)
dsc.resourceVersionExpectations.Expect(gotPod)
}
}
return
}

// podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node:
// - nodesNeedingDaemonPods: the pods need to start on the node
// - podsToDelete: the Pods need to be deleted on the node
Expand Down Expand Up @@ -785,6 +816,9 @@ func (dsc *ReconcileDaemonSet) podsShouldBeOnNode(
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, corev1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
} else if isPodPreDeleting(pod) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need add PreparingDelete pods into podsToDelete, if there is some logic which make the pods in the podsToDelete in the first place , is it still the case when next reconcile?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should put it into podsToDelete even if it has been set to PreparingDelete in previous reconcile.

klog.V(3).Infof("Found daemon pod %s/%s on node %s is in PreparingDelete state, will try to kill it", pod.Namespace, pod.Name, node.Name)
podsToDelete = append(podsToDelete, pod.Name)
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
Expand Down Expand Up @@ -949,46 +983,52 @@ func (dsc *ReconcileDaemonSet) cleanupHistory(ds *appsv1alpha1.DaemonSet, old []
return nil
}

func (dsc *ReconcileDaemonSet) refreshUpdateStatesAndGetSatisfied(ds *appsv1alpha1.DaemonSet, hash string) (bool, error) {
func (dsc *ReconcileDaemonSet) refreshUpdateStates(ds *appsv1alpha1.DaemonSet) error {
dsKey := keyFunc(ds)
opts := &inplaceupdate.UpdateOptions{}
opts = inplaceupdate.SetOptionsDefaults(opts)

nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
pods, err := dsc.getDaemonPods(ds)
if err != nil {
return false, fmt.Errorf("couldn't get node to daemon pod mapping for DaemonSet %q: %v", ds.Name, err)
return err
}

for _, pods := range nodeToDaemonPods {
for _, pod := range pods {
dsc.updateExpectations.ObserveUpdated(dsKey, hash, pod)
opts := &inplaceupdate.UpdateOptions{}
opts = inplaceupdate.SetOptionsDefaults(opts)

for _, pod := range pods {
if dsc.inplaceControl == nil {
continue
}
res := dsc.inplaceControl.Refresh(pod, opts)
if res.RefreshErr != nil {
klog.Errorf("DaemonSet %s/%s failed to update pod %s condition for inplace: %v", ds.Namespace, ds.Name, pod.Name, res.RefreshErr)
return res.RefreshErr
}
if res.DelayDuration != 0 {
durationStore.Push(dsKey, res.DelayDuration)
}
}

for _, pods := range nodeToDaemonPods {
for _, pod := range pods {
if dsc.inplaceControl == nil {
continue
}
res := dsc.inplaceControl.Refresh(pod, opts)
if res.RefreshErr != nil {
klog.Errorf("DaemonSet %s/%s failed to update pod %s condition for inplace: %v", ds.Namespace, ds.Name, pod.Name, res.RefreshErr)
return false, res.RefreshErr
}
if res.DelayDuration != 0 {
durationStore.Push(dsKey, res.DelayDuration)
}
}
return nil
}

func (dsc *ReconcileDaemonSet) hasPodExpectationsSatisfied(ds *appsv1alpha1.DaemonSet) bool {
dsKey := keyFunc(ds)
pods, err := dsc.getDaemonPods(ds)
if err != nil {
klog.Errorf("Failed to get pods for DaemonSet")
return false
}

updateSatisfied, unsatisfiedDuration, updateDirtyPods := dsc.updateExpectations.SatisfiedExpectations(dsKey, hash)
if !updateSatisfied {
if unsatisfiedDuration >= kruiseExpectations.ExpectationTimeout {
klog.Warningf("Expectation unsatisfied overtime for %v, updateDirtyPods=%v, timeout=%v", dsKey, updateDirtyPods, unsatisfiedDuration)
} else {
klog.V(5).Infof("Not satisfied update for %v, updateDirtyPods=%v", dsKey, updateDirtyPods)
durationStore.Push(dsKey, kruiseExpectations.ExpectationTimeout-unsatisfiedDuration)
for _, pod := range pods {
dsc.resourceVersionExpectations.Observe(pod)
if isSatisfied, unsatisfiedDuration := dsc.resourceVersionExpectations.IsSatisfied(pod); !isSatisfied {
if unsatisfiedDuration >= kruiseExpectations.ExpectationTimeout {
klog.Errorf("Expectation unsatisfied resourceVersion overtime for %v, wait for pod %v updating, timeout=%v", dsKey, pod.Name, unsatisfiedDuration)
} else {
klog.V(5).Infof("Not satisfied resourceVersion for %v, wait for pod %v updating", dsKey, pod.Name)
durationStore.Push(dsKey, kruiseExpectations.ExpectationTimeout-unsatisfiedDuration)
}
return false
}
}
return updateSatisfied, nil
return true
}
17 changes: 9 additions & 8 deletions pkg/controller/daemonset/daemonset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
kruiseinformers "github.com/openkruise/kruise/pkg/client/informers/externalversions"
kruiseappsinformers "github.com/openkruise/kruise/pkg/client/informers/externalversions/apps/v1alpha1"
kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/revisionadapter"
"github.com/openkruise/kruise/pkg/util/lifecycle"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -255,13 +255,14 @@ func NewDaemonSetController(
crControl: kubecontroller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
expectations: kubecontroller.NewControllerExpectations(),
updateExpectations: kruiseExpectations.NewUpdateExpectations(revisionadapter.NewDefaultImpl()),
dsLister: dsInformer.Lister(),
historyLister: revInformer.Lister(),
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
failedPodsBackoff: failedPodsBackoff,
lifecycleControl: lifecycle.NewForInformer(podInformer),
expectations: kubecontroller.NewControllerExpectations(),
resourceVersionExpectations: kruiseExpectations.NewResourceVersionExpectation(),
dsLister: dsInformer.Lister(),
historyLister: revInformer.Lister(),
podLister: podInformer.Lister(),
nodeLister: nodeInformer.Lister(),
failedPodsBackoff: failedPodsBackoff,
}
}

Expand Down
Loading