Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-operator/: support scaling a dm cluster with dm-masters and dm-workers #3186

Merged
merged 60 commits into from
Sep 2, 2020
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
08a8ae4
add dmclusters CRD
lichunzhu Aug 5, 2020
5cb82ac
resolve conflicts
lichunzhu Aug 5, 2020
bcf58ea
Merge branch 'master' into defineDMSpec
lichunzhu Aug 5, 2020
55a6365
address comment
lichunzhu Aug 6, 2020
0aae8c0
Merge branch 'master' into defineDMSpec
lichunzhu Aug 6, 2020
f146be1
address comments
lichunzhu Aug 6, 2020
94c427b
Merge branch 'defineDMSpec' of https://github.com/lichunzhu/tidb-oper…
lichunzhu Aug 6, 2020
4f10a51
delete monitor ref
lichunzhu Aug 6, 2020
a020492
generate dmcluster client
lichunzhu Aug 6, 2020
3edfaa2
address comments
lichunzhu Aug 6, 2020
74aca39
Merge branch 'master' into defineDMSpec
lichunzhu Aug 7, 2020
2c1bec5
address comment
lichunzhu Aug 7, 2020
fbe26f3
tmp commit
lichunzhu Aug 7, 2020
d85a9fc
resolve conflict
lichunzhu Aug 7, 2020
a9da15f
merge master
lichunzhu Aug 11, 2020
ba0f518
remove dm package
lichunzhu Aug 12, 2020
7a51c07
fix bugs
lichunzhu Aug 12, 2020
3ec6c86
fix bug
lichunzhu Aug 12, 2020
8122c71
support start dm-master and dm-worker in cluster
lichunzhu Aug 18, 2020
a38bb0e
fix some bugs
lichunzhu Aug 19, 2020
1e7efd8
merge master branch and resolve conflicts
lichunzhu Aug 19, 2020
850035c
fix ut
lichunzhu Aug 20, 2020
cd3f5b4
fix dm-master start
lichunzhu Aug 24, 2020
f4a641d
fix dm-worker start bug
lichunzhu Aug 24, 2020
5a51cbe
Merge branch 'master' of https://github.com/pingcap/tidb-operator int…
lichunzhu Aug 25, 2020
be4fd5e
add more column info
lichunzhu Aug 25, 2020
1e4f2ac
Merge branch 'master' of https://github.com/pingcap/tidb-operator int…
lichunzhu Aug 25, 2020
daf81f7
Merge branch 'master' into supportStartDMCluster
lichunzhu Aug 25, 2020
f7c70bd
fix ut
lichunzhu Aug 25, 2020
08f4695
fix verify
lichunzhu Aug 25, 2020
e2249f9
fix verify again
lichunzhu Aug 25, 2020
161b862
address comments
lichunzhu Aug 25, 2020
35a69d3
Merge branch 'master' into supportStartDMCluster
lichunzhu Aug 25, 2020
d3da808
regenerate code
lichunzhu Aug 25, 2020
8e52fbb
address comments
lichunzhu Aug 25, 2020
cefe7b8
Merge branch 'master' into supportStartDMCluster
lichunzhu Aug 25, 2020
f1b6029
fix import cycle problem
lichunzhu Aug 26, 2020
7726669
Merge branch 'supportStartDMCluster' of https://github.com/lichunzhu/…
lichunzhu Aug 26, 2020
84a6309
fix check
lichunzhu Aug 26, 2020
5ab015d
address comments
lichunzhu Aug 26, 2020
c33746e
support graceful upgrade for dm-master
lichunzhu Aug 26, 2020
b998616
merge master
lichunzhu Aug 26, 2020
f23a336
address comments
lichunzhu Aug 27, 2020
7282732
Merge branch 'master' into supportUpgradeDMCluster
lichunzhu Aug 27, 2020
cdc031a
add dm-master scaler
lichunzhu Aug 28, 2020
bb94d5e
tmp
lichunzhu Aug 28, 2020
d5cfc31
support scale dm cluster
lichunzhu Aug 31, 2020
c3d3592
merge master and resolve conflicts
lichunzhu Aug 31, 2020
bf0db90
let scaling take precedence over upgrading
lichunzhu Aug 31, 2020
f6e5a55
fix ut
lichunzhu Aug 31, 2020
e9e6032
Merge branch 'master' into supportScaleDMCluster
lichunzhu Sep 1, 2020
38db6b3
Merge branch 'master' into supportScaleDMCluster
lichunzhu Sep 1, 2020
57803d5
address comments
lichunzhu Sep 1, 2020
aa4903b
Merge branch 'supportScaleDMCluster' of https://github.com/lichunzhu/…
lichunzhu Sep 1, 2020
57e925d
Merge branch 'master' into supportScaleDMCluster
lichunzhu Sep 2, 2020
4b2deec
address comment
lichunzhu Sep 2, 2020
979fd98
address comment
lichunzhu Sep 2, 2020
ca4048c
Merge branch 'master' into supportScaleDMCluster
lichunzhu Sep 2, 2020
ead903f
address comment
lichunzhu Sep 2, 2020
12d8cba
Merge branch 'supportScaleDMCluster' of https://github.com/lichunzhu/…
lichunzhu Sep 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/apis/pingcap/v1alpha1/crd_kinds.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
TiDBClusterKindKey = "tidbcluster"

DMClusterName = "dmclusters"
DMCLusterKind = "DMCluster"
DMClusterKind = "DMCluster"
DMClusterKindKey = "dmcluster"

BackupName = "backups"
Expand Down Expand Up @@ -89,7 +89,7 @@ type CrdKinds struct {
var DefaultCrdKinds = CrdKinds{
KindsString: "",
TiDBCluster: CrdKind{Plural: TiDBClusterName, Kind: TiDBClusterKind, ShortNames: []string{"tc"}, SpecName: SpecPath + TiDBClusterKind},
DMCluster: CrdKind{Plural: DMClusterName, Kind: DMCLusterKind, ShortNames: []string{"dc"}, SpecName: SpecPath + DMCLusterKind},
DMCluster: CrdKind{Plural: DMClusterName, Kind: DMClusterKind, ShortNames: []string{"dc"}, SpecName: SpecPath + DMClusterKind},
Backup: CrdKind{Plural: BackupName, Kind: BackupKind, ShortNames: []string{"bk"}, SpecName: SpecPath + BackupKind},
Restore: CrdKind{Plural: RestoreName, Kind: RestoreKind, ShortNames: []string{"rt"}, SpecName: SpecPath + RestoreKind},
BackupSchedule: CrdKind{Plural: BackupScheduleName, Kind: BackupScheduleKind, ShortNames: []string{"bks"}, SpecName: SpecPath + BackupScheduleKind},
Expand Down
51 changes: 51 additions & 0 deletions pkg/apis/pingcap/v1alpha1/dmcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@
package v1alpha1

import (
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
"github.com/pingcap/tidb-operator/pkg/label"
"k8s.io/apimachinery/pkg/util/sets"
)

func (dc *DMCluster) Scheme() string {
Expand All @@ -33,6 +38,14 @@ func (dc *DMCluster) Timezone() string {
return tz
}

func (dc *DMCluster) IsPVReclaimEnabled() bool {
enabled := dc.Spec.EnablePVReclaim
if enabled == nil {
return defaultEnablePVReclaim
}
return *enabled
}

func (dc *DMCluster) IsTLSClusterEnabled() bool {
return dc.Spec.TLSCluster != nil && dc.Spec.TLSCluster.Enabled
}
Expand Down Expand Up @@ -84,6 +97,17 @@ func (dc *DMCluster) WorkerStsDesiredReplicas() int32 {
return dc.Spec.Worker.Replicas
}

func (dc *DMCluster) WorkerStsDesiredOrdinals(excludeFailover bool) sets.Int32 {
if dc.Spec.Worker == nil {
return sets.Int32{}
}
replicas := dc.Spec.Worker.Replicas
if !excludeFailover {
replicas = dc.WorkerStsDesiredReplicas()
}
return helper.GetPodOrdinalsFromReplicasAndDeleteSlots(replicas, dc.getDeleteSlots(label.DMWorkerLabelVal))
}

func (dc *DMCluster) GetInstanceName() string {
return dc.Name
}
Expand Down Expand Up @@ -130,6 +154,33 @@ func (dc *DMCluster) MasterScaling() bool {
return dc.Status.Master.Phase == ScalePhase
}

func (dc *DMCluster) getDeleteSlots(component string) (deleteSlots sets.Int32) {
deleteSlots = sets.NewInt32()
annotations := dc.GetAnnotations()
if annotations == nil {
return deleteSlots
}
var key string
if component == label.DMMasterLabelVal {
key = label.AnnDMMasterDeleteSlots
} else if component == label.DMWorkerLabelVal {
key = label.AnnDMWorkerDeleteSlots
} else {
return
}
value, ok := annotations[key]
if !ok {
return
}
var slice []int32
err := json.Unmarshal([]byte(value), &slice)
if err != nil {
return
}
deleteSlots.Insert(slice...)
return
}

func (dc *DMCluster) MasterIsAvailable() bool {
lowerLimit := dc.Spec.Master.Replicas/2 + 1
if int32(len(dc.Status.Master.Members)) < lowerLimit {
Expand Down
91 changes: 43 additions & 48 deletions pkg/controller/dmcluster/dm_cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,41 @@ func NewDefaultDMClusterControl(
dcControl controller.DMClusterControlInterface,
masterMemberManager manager.DMManager,
workerMemberManager manager.DMManager,
//reclaimPolicyManager manager.DMManager,
reclaimPolicyManager manager.DMManager,
//metaManager manager.DMManager,
//orphanPodsCleaner member.OrphanPodsCleaner,
orphanPodsCleaner member.OrphanPodsCleaner,
pvcCleaner member.PVCCleanerInterface,
pvcResizer member.PVCResizerInterface,
//podRestarter member.PodRestarter,
podRestarter member.PodRestarter,
conditionUpdater DMClusterConditionUpdater,
recorder record.EventRecorder) ControlInterface {
return &defaultDMClusterControl{
dcControl,
masterMemberManager,
workerMemberManager,
//reclaimPolicyManager,
reclaimPolicyManager,
//metaManager,
//orphanPodsCleaner,
orphanPodsCleaner,
pvcCleaner,
//podRestarter,
podRestarter,
pvcResizer,
conditionUpdater,
recorder,
}
}

type defaultDMClusterControl struct {
dcControl controller.DMClusterControlInterface
masterMemberManager manager.DMManager
workerMemberManager manager.DMManager
//reclaimPolicyManager manager.DMManager
dcControl controller.DMClusterControlInterface
masterMemberManager manager.DMManager
workerMemberManager manager.DMManager
reclaimPolicyManager manager.DMManager
//metaManager manager.DMManager
//orphanPodsCleaner member.OrphanPodsCleaner
pvcCleaner member.PVCCleanerInterface
//podRestarter member.PodRestarter
pvcResizer member.PVCResizerInterface
conditionUpdater DMClusterConditionUpdater
recorder record.EventRecorder
orphanPodsCleaner member.OrphanPodsCleaner
pvcCleaner member.PVCCleanerInterface
podRestarter member.PodRestarter
pvcResizer member.PVCResizerInterface
conditionUpdater DMClusterConditionUpdater
recorder record.EventRecorder
}

// UpdateStatefulSet executes the core logic loop for a dmcluster.
Expand Down Expand Up @@ -123,29 +123,25 @@ func (dcc *defaultDMClusterControl) validate(dc *v1alpha1.DMCluster) bool {

func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) error {
var errs []error
// TODO: implement reclaimPolicyManager
// syncing all PVs managed by operator's reclaim policy to Retain
// if err := dcc.reclaimPolicyManager.Sync(dc); err != nil {
// return err
// }
if err := dcc.reclaimPolicyManager.SyncDM(dc); err != nil {
return err
}

// TODO: add orphanPodsCleaner for dm cluster
// cleaning all orphan pods(pd, tikv or tiflash which don't have a related PVC) managed by operator
// skipReasons, err := dcc.orphanPodsCleaner.Clean(dc)
// if err != nil {
// return err
// }
// if klog.V(10) {
// for podName, reason := range skipReasons {
// klog.Infof("pod %s of cluster %s/%s is skipped, reason %q", podName, dc.Namespace, dc.Name, reason)
// }
// }
// cleaning all orphan pods(dm-master or dm-worker which don't have a related PVC) managed by operator
skipReasons, err := dcc.orphanPodsCleaner.Clean(dc)
if err != nil {
return err
}
if klog.V(10) {
for podName, reason := range skipReasons {
klog.Infof("pod %s of cluster %s/%s is skipped, reason %q", podName, dc.Namespace, dc.Name, reason)
}
}

// TODO: restarted pods in dm cluster
// sync all the pods which need to be restarted
// if err := dcc.podRestarter.Sync(dc); err != nil {
// return err
// }
if err := dcc.podRestarter.Sync(dc); err != nil {
return err
}

// works that should do to making the dm-master cluster current state match the desired state:
// - create or update the dm-master service
Expand All @@ -158,7 +154,7 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// - upgrade the dm-master cluster
// - scale out/in the dm-master cluster
// - failover the dm-master cluster
if err := dcc.masterMemberManager.Sync(dc); err != nil {
if err := dcc.masterMemberManager.SyncDM(dc); err != nil {
errs = append(errs, err)
}

Expand All @@ -170,7 +166,7 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// - upgrade the dm-worker cluster
// - scale out/in the dm-worker cluster
// - failover the dm-worker cluster
if err := dcc.workerMemberManager.Sync(dc); err != nil {
if err := dcc.workerMemberManager.SyncDM(dc); err != nil {
errs = append(errs, err)
}

Expand All @@ -182,16 +178,15 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// return err
// }

// TODO: clean pods cleaning the pod scheduling annotation for dm cluster
// pvcSkipReasons, err := dcc.pvcCleaner.Clean(dc)
// if err != nil {
// return err
// }
// if klog.V(10) {
// for pvcName, reason := range pvcSkipReasons {
// klog.Infof("pvc %s of cluster %s/%s is skipped, reason %q", pvcName, dc.Namespace, dc.Name, reason)
// }
// }
pvcSkipReasons, err := dcc.pvcCleaner.Clean(dc)
if err != nil {
return err
}
if klog.V(10) {
for pvcName, reason := range pvcSkipReasons {
klog.Infof("pvc %s of cluster %s/%s is skipped, reason %q", pvcName, dc.Namespace, dc.Name, reason)
}
}

// TODO: sync dm cluster attributes
// syncing the some tidbcluster status attributes
Expand Down
32 changes: 19 additions & 13 deletions pkg/controller/dmcluster/dm_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/dmapi"
mm "github.com/pingcap/tidb-operator/pkg/manager/member"
"github.com/pingcap/tidb-operator/pkg/manager/meta"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -97,9 +98,12 @@ func NewController(
svcControl := controller.NewRealServiceControl(kubeCli, svcInformer.Lister(), recorder)
pvControl := controller.NewRealPVControl(kubeCli, pvcInformer.Lister(), pvInformer.Lister(), recorder)
pvcControl := controller.NewRealPVCControl(kubeCli, recorder, pvcInformer.Lister())
//podControl := controller.NewRealPodControl(kubeCli, nil, podInformer.Lister(), recorder)
podControl := controller.NewRealPodControl(kubeCli, nil, podInformer.Lister(), recorder)
typedControl := controller.NewTypedControl(controller.NewRealGenericControl(genericCli, recorder))
masterScaler := mm.NewMasterScaler(masterControl, pvcInformer.Lister(), pvcControl)
masterUpgrader := mm.NewMasterUpgrader(masterControl, podInformer.Lister())
workerScaler := mm.NewWorkerScaler(pvcInformer.Lister(), pvcControl)
podRestarter := mm.NewPodRestarter(kubeCli, podInformer.Lister())

dcc := &Controller{
kubeClient: kubeCli,
Expand All @@ -116,6 +120,7 @@ func NewController(
podInformer.Lister(),
epsInformer.Lister(),
pvcInformer.Lister(),
masterScaler,
masterUpgrader,
),
mm.NewWorkerMemberManager(
Expand All @@ -126,12 +131,13 @@ func NewController(
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
workerScaler,
),
meta.NewReclaimPolicyDMManager(
pvcInformer.Lister(),
pvInformer.Lister(),
pvControl,
),
//meta.NewReclaimPolicyManager(
// pvcInformer.Lister(),
// pvInformer.Lister(),
// pvControl,
//),
//meta.NewMetaManager(
// pvcInformer.Lister(),
// pvcControl,
Expand All @@ -140,12 +146,12 @@ func NewController(
// podInformer.Lister(),
// podControl,
//),
//mm.NewOrphanPodsCleaner(
// podInformer.Lister(),
// podControl,
// pvcInformer.Lister(),
// kubeCli,
//),
mm.NewOrphanPodsCleaner(
podInformer.Lister(),
podControl,
pvcInformer.Lister(),
kubeCli,
),
mm.NewRealPVCCleaner(
kubeCli,
podInformer.Lister(),
Expand All @@ -160,7 +166,7 @@ func NewController(
scInformer,
),
//mm.NewDMClusterStatusManager(kubeCli, cli, scalerInformer.Lister(), tikvGroupInformer.Lister()),
//podRestarter,
podRestarter,
&dmClusterConditionUpdater{},
recorder,
),
Expand Down
30 changes: 30 additions & 0 deletions pkg/dmapi/dmapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type MasterClient interface {
GetWorkers() ([]*WorkersInfo, error)
GetLeader() (MembersLeader, error)
EvictLeader() error
DeleteMaster(name string) error
DeleteWorker(name string) error
}

var (
Expand Down Expand Up @@ -195,6 +197,34 @@ func (mc *masterClient) EvictLeader() error {
return nil
}

func (mc *masterClient) deleteMember(query string) error {
apiURL := fmt.Sprintf("%s/%s%s", mc.url, membersPrefix, query)
body, err := httputil.DeleteBodyOK(mc.httpClient, apiURL)
if err != nil {
return err
}
deleteMemeberResp := &RespHeader{}
err = json.Unmarshal(body, deleteMemeberResp)
if err != nil {
return fmt.Errorf("unable to unmarshal delete member resp: %s, query: %s, err: %s", body, query, err)
}
if !deleteMemeberResp.Result {
return fmt.Errorf("unable to delete member, query: %s, err: %s", query, deleteMemeberResp.Msg)
}

return nil
}

func (mc *masterClient) DeleteMaster(name string) error {
query := "/master/" + name
return mc.deleteMember(query)
}

func (mc *masterClient) DeleteWorker(name string) error {
query := "/worker/" + name
return mc.deleteMember(query)
}

// NewMasterClient returns a new MasterClient
func NewMasterClient(url string, timeout time.Duration, tlsConfig *tls.Config, disableKeepalive bool) MasterClient {
return &masterClient{
Expand Down
Loading