Skip to content

Commit

Permalink
fix some orphan pods cleaner bugs (#878) (#883)
Browse files Browse the repository at this point in the history
* fix some orphan pods cleaner bugs

* pod pending check

* reword

* fix typo

* fast path

* double check pod in apiserver
  • Loading branch information
cofyc authored and tennix committed Sep 6, 2019
1 parent 4c6c104 commit 967e9d6
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 29 deletions.
19 changes: 17 additions & 2 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,23 @@ func main() {
bsController := backupschedule.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
controllerCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go informerFactory.Start(controllerCtx.Done())
go kubeInformerFactory.Start(controllerCtx.Done())

// Start informer factories after all controller are initialized.
informerFactory.Start(controllerCtx.Done())
kubeInformerFactory.Start(controllerCtx.Done())

// Wait for all started informers' cache were synced.
for v, synced := range informerFactory.WaitForCacheSync(wait.NeverStop) {
if !synced {
glog.Fatalf("error syncing informer for %v", v)
}
}
for v, synced := range kubeInformerFactory.WaitForCacheSync(wait.NeverStop) {
if !synced {
glog.Fatalf("error syncing informer for %v", v)
}
}
glog.Infof("cache of informer factories sync successfully")

onStarted := func(ctx context.Context) {
go wait.Forever(func() { backupController.Run(workers, ctx.Done()) }, waitDuration)
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/backup/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func (bkc *Controller) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting backup controller")
defer glog.Info("Shutting down backup controller")

if !cache.WaitForCacheSync(stopCh, bkc.backupListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(bkc.worker, time.Second, stopCh)
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/backupschedule/backup_schedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ func (bsc *Controller) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting backup schedule controller")
defer glog.Info("Shutting down backup schedule controller")

if !cache.WaitForCacheSync(stopCh, bsc.bsListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(bsc.worker, time.Second, stopCh)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/pdapi"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -191,7 +192,9 @@ func (rpc *realPodControl) DeletePod(tc *v1alpha1.TidbCluster, pod *corev1.Pod)
ns := tc.GetNamespace()
tcName := tc.GetName()
podName := pod.GetName()
err := rpc.kubeCli.CoreV1().Pods(ns).Delete(podName, nil)
preconditions := metav1.Preconditions{UID: &pod.UID}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions}
err := rpc.kubeCli.CoreV1().Pods(ns).Delete(podName, &deleteOptions)
if err != nil {
glog.Errorf("failed to delete Pod: [%s/%s], TidbCluster: %s, %v", ns, podName, tcName, err)
} else {
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/restore/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ func (rsc *Controller) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting restore controller")
defer glog.Info("Shutting down restore controller")

if !cache.WaitForCacheSync(stopCh, rsc.restoreListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func NewController(
podInformer.Lister(),
podControl,
pvcInformer.Lister(),
kubeCli,
),
recorder,
),
Expand Down Expand Up @@ -210,10 +211,6 @@ func (tcc *Controller) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting tidbcluster controller")
defer glog.Info("Shutting down tidbcluster controller")

if !cache.WaitForCacheSync(stopCh, tcc.tcListerSynced, tcc.setListerSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(tcc.worker, time.Second, stopCh)
}
Expand Down
67 changes: 61 additions & 6 deletions pkg/manager/member/orphan_pods_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,35 @@ import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
)

const (
skipReasonOrphanPodsCleanerIsNotPDOrTiKV = "orphan pods cleaner: member type is not pd or tikv"
skipReasonOrphanPodsCleanerPVCNameIsEmpty = "orphan pods cleaner: pvcName is empty"
skipReasonOrphanPodsCleanerPVCIsFound = "orphan pods cleaner: pvc is found"
skipReasonOrphanPodsCleanerIsNotPDOrTiKV = "orphan pods cleaner: member type is not pd or tikv"
skipReasonOrphanPodsCleanerPVCNameIsEmpty = "orphan pods cleaner: pvcName is empty"
skipReasonOrphanPodsCleanerPVCIsFound = "orphan pods cleaner: pvc is found"
skipReasonOrphanPodsCleanerPodIsNotPending = "orphan pods cleaner: pod is not pending"
skipReasonOrphanPodsCleanerPodIsNotFound = "orphan pods cleaner: pod does not exist anymore"
skipReasonOrphanPodsCleanerPodChanged = "orphan pods cleaner: pod changed before deletion"
)

// OrphanPodsCleaner implements the logic for cleaning the orphan pods(has no pvc)
//
// In scaling out and failover, we will try to delete the old PVC to prevent it
// from being used by the new pod. However, the PVC might not be deleted
// immediately in the apiserver because of finalizers (e.g.
// kubernetes.io/pvc-protection) and the statefulset controller may not have
// received PVC delete event when it tries to create the new replica and the
// new pod will be pending forever because no PVC to use. We need to clean
// these orphan pods and let the statefulset controller to create PVC(s) for
// them.
//
// https://github.com/kubernetes/kubernetes/blob/84fe3db5cf58bf0fc8ff792b885465ceaf70a435/pkg/controller/statefulset/stateful_pod_control.go#L175-L199
//
type OrphanPodsCleaner interface {
Clean(*v1alpha1.TidbCluster) (map[string]string, error)
}
Expand All @@ -37,13 +55,15 @@ type orphanPodsCleaner struct {
podLister corelisters.PodLister
podControl controller.PodControlInterface
pvcLister corelisters.PersistentVolumeClaimLister
kubeCli kubernetes.Interface
}

// NewOrphanPodsCleaner returns a OrphanPodsCleaner
func NewOrphanPodsCleaner(podLister corelisters.PodLister,
podControl controller.PodControlInterface,
pvcLister corelisters.PersistentVolumeClaimLister) OrphanPodsCleaner {
return &orphanPodsCleaner{podLister, podControl, pvcLister}
pvcLister corelisters.PersistentVolumeClaimLister,
kubeCli kubernetes.Interface) OrphanPodsCleaner {
return &orphanPodsCleaner{podLister, podControl, pvcLister, kubeCli}
}

func (opc *orphanPodsCleaner) Clean(tc *v1alpha1.TidbCluster) (map[string]string, error) {
Expand All @@ -68,6 +88,12 @@ func (opc *orphanPodsCleaner) Clean(tc *v1alpha1.TidbCluster) (map[string]string
continue
}

if pod.Status.Phase != v1.PodPending {
skipReason[podName] = skipReasonOrphanPodsCleanerPodIsNotPending
continue
}

// TODO support multiple pvcs case?
var pvcName string
for _, vol := range pod.Spec.Volumes {
if vol.PersistentVolumeClaim != nil {
Expand All @@ -80,7 +106,9 @@ func (opc *orphanPodsCleaner) Clean(tc *v1alpha1.TidbCluster) (map[string]string
continue
}

_, err := opc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
var err error
// check informer cache
_, err = opc.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
if err == nil {
skipReason[podName] = skipReasonOrphanPodsCleanerPVCIsFound
continue
Expand All @@ -89,6 +117,33 @@ func (opc *orphanPodsCleaner) Clean(tc *v1alpha1.TidbCluster) (map[string]string
return skipReason, err
}

// if PVC not found in cache, re-check from apiserver directly to make sure the PVC really not exist
_, err = opc.kubeCli.CoreV1().PersistentVolumeClaims(ns).Get(pvcName, metav1.GetOptions{})
if err == nil {
skipReason[podName] = skipReasonOrphanPodsCleanerPVCIsFound
continue
}
if !errors.IsNotFound(err) {
return skipReason, err
}

// if the PVC is not found in apiserver (also informer cache) and the
// phase of the Pod is Pending, delete it and let the stateful
// controller to create the pod and its PVC(s) again
apiPod, err := opc.kubeCli.CoreV1().Pods(ns).Get(podName, metav1.GetOptions{})
if errors.IsNotFound(err) {
skipReason[podName] = skipReasonOrphanPodsCleanerPodIsNotFound
continue
}
if err != nil {
return skipReason, err
}
// try our best to avoid deleting wrong object in apiserver
// TODO upgrade to use deleteOption.Preconditions.ResourceVersion in client-go 1.14+
if apiPod.UID != pod.UID || apiPod.ResourceVersion != pod.ResourceVersion {
skipReason[podName] = skipReasonOrphanPodsCleanerPodChanged
continue
}
err = opc.podControl.DeletePod(tc, pod)
if err != nil {
glog.Errorf("orphan pods cleaner: failed to clean orphan pod: %s/%s, %v", ns, podName, err)
Expand Down
Loading

0 comments on commit 967e9d6

Please sign in to comment.