Skip to content

Commit

Permalink
remove records of deleted pods from failure stores automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
cofyc committed May 26, 2020
1 parent 163c4a3 commit f38ffbb
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 61 deletions.
29 changes: 27 additions & 2 deletions pkg/manager/member/tiflash_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
)

// TODO reuse tikvFailover since we share the same logic
type tiflashFailover struct {
tiflashFailoverPeriod time.Duration
recorder record.EventRecorder
Expand All @@ -34,6 +36,16 @@ func NewTiFlashFailover(tiflashFailoverPeriod time.Duration, recorder record.Eve
return &tiflashFailover{tiflashFailoverPeriod, recorder}
}

func (tff *tiflashFailover) isPodDesired(tc *v1alpha1.TidbCluster, podName string) bool {
ordinals := tc.TiFlashStsDesiredOrdinals(true)
ordinal, err := util.GetOrdinalFromPodName(podName)
if err != nil {
klog.Errorf("unexpected pod name %q: %v", podName, err)
return false
}
return ordinals.Has(ordinal)
}

func (tff *tiflashFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
Expand All @@ -43,6 +55,12 @@ func (tff *tiflashFailover) Failover(tc *v1alpha1.TidbCluster) error {
if store.LastTransitionTime.IsZero() {
continue
}
if !tff.isPodDesired(tc, podName) {
// we should ignore the store record of deleted pod, otherwise the
// record of deleted pod may be added back to failure stores
// (before it enters into Offline/Tombstone state)
continue
}
deadline := store.LastTransitionTime.Add(tff.tiflashFailoverPeriod)
exist := false
for _, failureStore := range tc.Status.TiFlash.FailureStores {
Expand Down Expand Up @@ -74,8 +92,15 @@ func (tff *tiflashFailover) Failover(tc *v1alpha1.TidbCluster) error {
return nil
}

func (tff *tiflashFailover) Recover(_ *v1alpha1.TidbCluster) {
// Do nothing now
func (tff *tiflashFailover) Recover(tc *v1alpha1.TidbCluster) {
for key, failureStore := range tc.Status.TiFlash.FailureStores {
if !tff.isPodDesired(tc, failureStore.PodName) {
// If we delete the pods, e.g. by using advanced statefulset delete
// slots feature. We should remove the record of undesired pods,
// otherwise an extra replacement pod will be created.
delete(tc.Status.TiFlash.FailureStores, key)
}
}
}

type fakeTiFlashFailover struct{}
Expand Down
28 changes: 26 additions & 2 deletions pkg/manager/member/tikv_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -34,6 +35,16 @@ func NewTiKVFailover(tikvFailoverPeriod time.Duration, recorder record.EventReco
return &tikvFailover{tikvFailoverPeriod, recorder}
}

func (tf *tikvFailover) isPodDesired(tc *v1alpha1.TidbCluster, podName string) bool {
ordinals := tc.TiKVStsDesiredOrdinals(true)
ordinal, err := util.GetOrdinalFromPodName(podName)
if err != nil {
klog.Errorf("unexpected pod name %q: %v", podName, err)
return false
}
return ordinals.Has(ordinal)
}

func (tf *tikvFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
Expand All @@ -43,6 +54,12 @@ func (tf *tikvFailover) Failover(tc *v1alpha1.TidbCluster) error {
if store.LastTransitionTime.IsZero() {
continue
}
if !tf.isPodDesired(tc, podName) {
// we should ignore the store record of deleted pod, otherwise the
// record of deleted pod may be added back to failure stores
// (before it enters into Offline/Tombstone state)
continue
}
deadline := store.LastTransitionTime.Add(tf.tikvFailoverPeriod)
exist := false
for _, failureStore := range tc.Status.TiKV.FailureStores {
Expand Down Expand Up @@ -75,8 +92,15 @@ func (tf *tikvFailover) Failover(tc *v1alpha1.TidbCluster) error {
return nil
}

func (tf *tikvFailover) Recover(_ *v1alpha1.TidbCluster) {
// Do nothing now
func (tf *tikvFailover) Recover(tc *v1alpha1.TidbCluster) {
for key, failureStore := range tc.Status.TiKV.FailureStores {
if !tf.isPodDesired(tc, failureStore.PodName) {
// If we delete the pods, e.g. by using advanced statefulset delete
// slots feature. We should remove the record of undesired pods,
// otherwise an extra replacement pod will be created.
delete(tc.Status.TiKV.FailureStores, key)
}
}
}

type fakeTiKVFailover struct{}
Expand Down
110 changes: 53 additions & 57 deletions pkg/manager/member/tikv_failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,12 @@ import (
)

func TestTiKVFailoverFailover(t *testing.T) {
g := NewGomegaWithT(t)

type testcase struct {
tests := []struct {
name string
update func(*v1alpha1.TidbCluster)
err bool
expectFn func(*v1alpha1.TidbCluster)
}
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
tc := newTidbClusterForPD()
tc.Spec.TiKV.MaxFailoverCount = pointer.Int32Ptr(3)
test.update(tc)
tikvFailover := newFakeTiKVFailover()

err := tikvFailover.Failover(tc)
if test.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
test.expectFn(tc)
}

tests := []testcase{
expectFn func(t *testing.T, tc *v1alpha1.TidbCluster)
}{
{
name: "normal",
update: func(tc *v1alpha1.TidbCluster) {
Expand All @@ -67,8 +48,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(2))
},
},
Expand All @@ -80,8 +61,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(0))
},
},
Expand All @@ -97,8 +78,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(0))
},
},
Expand All @@ -113,8 +94,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(0))
},
},
Expand All @@ -136,8 +117,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(1))
},
},
Expand All @@ -147,17 +128,17 @@ func TestTiKVFailoverFailover(t *testing.T) {
tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{
"3": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-3",
PodName: "tikv-0",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"10": {
"4": {
State: v1alpha1.TiKVStateUp,
PodName: "tikv-10",
PodName: "tikv-4",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"11": {
"5": {
State: v1alpha1.TiKVStateUp,
PodName: "tikv-11",
PodName: "tikv-5",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-61 * time.Minute)},
},
}
Expand All @@ -173,8 +154,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
Expand All @@ -187,14 +168,14 @@ func TestTiKVFailoverFailover(t *testing.T) {
PodName: "tikv-3",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"10": {
"4": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-10",
PodName: "tikv-4",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"11": {
"5": {
State: v1alpha1.TiKVStateUp,
PodName: "tikv-11",
PodName: "tikv-5",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-61 * time.Minute)},
},
}
Expand All @@ -210,28 +191,28 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
{
name: "exceed max failover count2",
update: func(tc *v1alpha1.TidbCluster) {
tc.Status.TiKV.Stores = map[string]v1alpha1.TiKVStore{
"12": {
"0": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-12",
PodName: "tikv-0",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
"13": {
"4": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-13",
PodName: "tikv-4",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-61 * time.Minute)},
},
"14": {
"5": {
State: v1alpha1.TiKVStateDown,
PodName: "tikv-14",
PodName: "tikv-5",
LastTransitionTime: metav1.Time{Time: time.Now().Add(-70 * time.Minute)},
},
}
Expand All @@ -251,8 +232,8 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
Expand Down Expand Up @@ -293,14 +274,29 @@ func TestTiKVFailoverFailover(t *testing.T) {
}
},
err: false,
expectFn: func(tc *v1alpha1.TidbCluster) {
g.Expect(int(tc.Spec.TiKV.Replicas)).To(Equal(3))
expectFn: func(t *testing.T, tc *v1alpha1.TidbCluster) {
g := NewGomegaWithT(t)
g.Expect(len(tc.Status.TiKV.FailureStores)).To(Equal(3))
},
},
}
for i := range tests {
testFn(&tests[i], t)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewGomegaWithT(t)
tc := newTidbClusterForPD()
tc.Spec.TiKV.Replicas = 6
tc.Spec.TiKV.MaxFailoverCount = pointer.Int32Ptr(3)
tt.update(tc)
tikvFailover := newFakeTiKVFailover()

err := tikvFailover.Failover(tc)
if tt.err {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
tt.expectFn(t, tc)
})
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (tkmm *tikvMemberManager) syncStatefulSetForTidbCluster(tc *v1alpha1.TidbCl
}
}

if len(tc.Status.TiKV.FailureStores) > 0 {
tkmm.tikvFailover.Recover(tc)
}

return updateStatefulSet(tkmm.setControl, tc, newSet, oldSet)
}

Expand Down
Loading

0 comments on commit f38ffbb

Please sign in to comment.