Skip to content

Commit

Permalink
Add timestamp annotation in tidbcluster statefulset (#1875)
Browse files Browse the repository at this point in the history
* Add timestamp annotation in tidbcluster statefulset
  • Loading branch information
Yisaer authored Mar 9, 2020
1 parent b4c6a51 commit 622949b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 1 deletion.
10 changes: 10 additions & 0 deletions cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller/autoscaler"
"github.com/pingcap/tidb-operator/pkg/controller/backup"
"github.com/pingcap/tidb-operator/pkg/controller/backupschedule"
"github.com/pingcap/tidb-operator/pkg/controller/periodicity"
"github.com/pingcap/tidb-operator/pkg/controller/restore"
"github.com/pingcap/tidb-operator/pkg/controller/tidbcluster"
"github.com/pingcap/tidb-operator/pkg/controller/tidbinitializer"
Expand Down Expand Up @@ -190,6 +191,12 @@ func main() {
bsController := backupschedule.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
tidbInitController := tidbinitializer.NewController(kubeCli, cli, genericCli, informerFactory, kubeInformerFactory)
tidbMonitorController := tidbmonitor.NewController(kubeCli, genericCli, informerFactory, kubeInformerFactory)

var periodicityController *periodicity.Controller
if controller.PodWebhookEnabled {
periodicityController = periodicity.NewController(kubeCli, informerFactory, kubeInformerFactory)
}

var autoScalerController *autoscaler.Controller
if features.DefaultFeatureGate.Enabled(features.AutoScaling) {
autoScalerController = autoscaler.NewController(kubeCli, cli, informerFactory, kubeInformerFactory)
Expand All @@ -216,6 +223,9 @@ func main() {
go wait.Forever(func() { bsController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { tidbInitController.Run(workers, ctx.Done()) }, waitDuration)
go wait.Forever(func() { tidbMonitorController.Run(workers, ctx.Done()) }, waitDuration)
if controller.PodWebhookEnabled {
go wait.Forever(func() { periodicityController.Run(ctx.Done()) }, waitDuration)
}
if features.DefaultFeatureGate.Enabled(features.AutoScaling) {
go wait.Forever(func() { autoScalerController.Run(workers, ctx.Done()) }, waitDuration)
}
Expand Down
124 changes: 124 additions & 0 deletions pkg/controller/periodicity/periodicity_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Package periodicity dedicate the periodicity controller.
// This controller updates StatefulSets managed by our operator periodically.
// This is necessary when the pod admission webhook is used. Because we will
// deny pod deletion requests if the pod is not ready for deletion. However,
// retry duration on StatefulSet in its controller grows exponentially on
// failures. So we need to update StatefulSets to trigger events, then they
// will be put into the process queue of StatefulSet controller constantly.
// Refer to https://github.com/pingcap/tidb-operator/pull/1875 and
// https://github.com/pingcap/tidb-operator/issues/1846 for more details.
package periodicity

import (
"k8s.io/apimachinery/pkg/util/wait"
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
v1alpha1listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/errors"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
eventv1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
)

type Controller struct {
stsLister appslisters.StatefulSetLister
tcLister v1alpha1listers.TidbClusterLister
statefulSetControl controller.StatefulSetControlInterface
}

func NewController(
kubeCli kubernetes.Interface,
informerFactory informers.SharedInformerFactory,
kubeInformerFactory kubeinformers.SharedInformerFactory) *Controller {

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&eventv1.EventSinkImpl{
Interface: eventv1.New(kubeCli.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(v1alpha1.Scheme, corev1.EventSource{Component: "periodiciy-controller"})
stsLister := kubeInformerFactory.Apps().V1().StatefulSets().Lister()

return &Controller{
tcLister: informerFactory.Pingcap().V1alpha1().TidbClusters().Lister(),
statefulSetControl: controller.NewRealStatefuSetControl(kubeCli, stsLister, recorder),
stsLister: stsLister,
}

}

func (c *Controller) Run(stopCh <-chan struct{}) {
klog.Infof("Staring periodicity controller")
defer klog.Infof("Shutting down periodicity controller")
wait.Until(c.run, time.Minute, stopCh)
}

func (c *Controller) run() {
var errs []error
if err := c.syncStatefulSetTimeStamp(); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
klog.Errorf("error happened in periodicity controller,err:%v", errors.NewAggregate(errs))
}
}

// in this sync function, we update all stateful sets the operator managed and log errors
func (c *Controller) syncStatefulSetTimeStamp() error {
selector, err := label.New().Selector()
if err != nil {
return err
}
stsList, err := c.stsLister.List(selector)
if err != nil {
return err
}
var errs []error
for _, sts := range stsList {
// If there is any error during our sts annotation updating, we just collect the error
// and continue to next sts
if sts.Annotations == nil {
sts.Annotations = map[string]string{}
}
if sts.Labels == nil {
sts.Labels = map[string]string{}
}
tcName, ok := sts.Labels[label.InstanceLabelKey]
if !ok {
continue
}
tc, err := c.tcLister.TidbClusters(sts.Namespace).Get(tcName)
if err != nil {
errs = append(errs, err)
continue
}
sts.Annotations[label.AnnStsLastSyncTimestamp] = time.Now().Format(time.RFC3339)
newSts, err := c.statefulSetControl.UpdateStatefulSet(tc, sts)
if err != nil {
klog.Errorf("failed to update statefulset %q, error: %v", sts.Name, err)
errs = append(errs, err)
}
klog.Infof("successfully updated statefulset %q", newSts.Name)
}
return errors.NewAggregate(errs)
}
2 changes: 2 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
AnnEvictLeaderBeginTime = "tidb.pingcap.com/evictLeaderBeginTime"
// AnnPodDeferDeleting is pod annotation key to indicate the pod which need to be restarted
AnnPodDeferDeleting = "tidb.pingcap.com/pod-defer-deleting"
// AnnStsSyncTimestamp is sts annotation key to indicate the last timestamp the operator sync the sts
AnnStsLastSyncTimestamp = "tidb.pingcap.com/sync-timestamp"

// AnnForceUpgradeVal is tc annotation value to indicate whether force upgrade should be done
AnnForceUpgradeVal = "true"
Expand Down
5 changes: 4 additions & 1 deletion pkg/manager/member/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"

"github.com/BurntSushi/toml"
"github.com/pingcap/advanced-statefulset/pkg/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand Down Expand Up @@ -281,6 +280,10 @@ func updateStatefulSet(setCtl controller.StatefulSetControlInterface, tc *v1alph
set.Spec.Template.Annotations[LastAppliedConfigAnnotation] = podConfig
}
set.Annotations = newSet.Annotations
v, ok := oldSet.Annotations[label.AnnStsLastSyncTimestamp]
if ok {
set.Annotations[label.AnnStsLastSyncTimestamp] = v
}
*set.Spec.Replicas = *newSet.Spec.Replicas
set.Spec.UpdateStrategy = newSet.Spec.UpdateStrategy
if isOrphan {
Expand Down

0 comments on commit 622949b

Please sign in to comment.