From a7e16bf53618ee60f92d5bf714455b53e98e4fab Mon Sep 17 00:00:00 2001 From: shuijing198799 Date: Fri, 15 Mar 2019 21:32:25 +0800 Subject: [PATCH 1/6] add schduler test case --- tests/actions.go | 221 +++++++++++++++++++++++++++++++++++-- tests/backup/backupcase.go | 15 +++ tests/cmd/e2e/main.go | 7 ++ 3 files changed, 235 insertions(+), 8 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index 61b50b11c3..27c1a93fc8 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -31,8 +31,10 @@ import ( "github.com/pingcap/tidb-operator/pkg/label" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" ) @@ -48,6 +50,11 @@ func NewOperatorActions(cli versioned.Interface, kubeCli kubernetes.Interface) O const ( DefaultPollTimeout time.Duration = 10 * time.Minute DefaultPollInterval time.Duration = 1 * time.Minute + getBackupDirPodName = "get-backup-dir" +) + +var ( + backupDir string ) type OperatorActions interface { @@ -74,6 +81,7 @@ type OperatorActions interface { CleanMonitor(info *TidbClusterInfo) error ForceDeploy(info *TidbClusterInfo) error CreateSecret(info *TidbClusterInfo) error + getBackupDir(info *TidbClusterInfo) (string, error) } type FaultTriggerActions interface { @@ -113,6 +121,7 @@ type OperatorInfo struct { } type TidbClusterInfo struct { + Name string Namespace string ClusterName string OperatorTag string @@ -121,6 +130,7 @@ type TidbClusterInfo struct { TiDBImage string StorageClassName string Password string + InitSql string RecordCount string InsertBetchSize string Resources map[string]string @@ -129,9 +139,6 @@ type TidbClusterInfo struct { func (tc *TidbClusterInfo) HelmSetString() string { - // add a database and table for test - initSql := `"create database record;use record;create table test(t char(32));"` - set := map[string]string{ "clusterName": tc.ClusterName, "pd.storageClassName": tc.StorageClassName, @@ -143,7 +150,7 @@ func (tc *TidbClusterInfo) HelmSetString() string { "tikv.image": tc.TiKVImage, "tidb.image": tc.TiDBImage, "tidb.passwordSecretName": "set-secret", - "tidb.initSql": initSql, + "tidb.initSql": tc.InitSql, } for k, v := range tc.Resources { @@ -243,6 +250,7 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error { info.ClusterName, fmt.Sprintf("%s-backup", info.ClusterName), fmt.Sprintf("%s-restore", info.ClusterName), + fmt.Sprintf("%s-scheduler-backup", info.ClusterName), } for _, chartName := range charts { res, err := exec.Command("helm", "del", "--purge", chartName).CombinedOutput() @@ -252,6 +260,11 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error { } } + _, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{}) + if !errors.IsNotFound(err) { + oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{}) + } + setStr := label.New().Instance(info.ClusterName).String() resources := []string{"pvc"} @@ -847,9 +860,11 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error { defer func() { glog.Infof("deploy adhoc backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) }() + // I need a name for pvc and another name for backupDir, + // But in operator, there are the same sets := map[string]string{ "clusterName": info.ClusterName, - "name": "test-backup", + "name": info.Name, "mode": "backup", "user": "root", "password": info.Password, @@ -873,6 +888,7 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error { if err != nil { return fmt.Errorf("failed to launch adhoc backup job: %v, %s", err, string(res)) } + return nil } @@ -882,7 +898,7 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error { glog.Infof("deploy clean backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) }() - jobName := fmt.Sprintf("%s-%s", info.ClusterName, "test-backup") + jobName := fmt.Sprintf("%s-%s", info.ClusterName, info.Name) fn := func() (bool, error) { job, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).Get(jobName, metav1.GetOptions{}) if err != nil { @@ -901,6 +917,12 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error { if err != nil { return fmt.Errorf("failed to launch scheduler backup job: %v", err) } + + backupDir, err = oa.getBackupDir(info) + if err != nil { + return fmt.Errorf("failed to get backup dir: %v", err) + } + return nil } @@ -911,7 +933,7 @@ func (oa *operatorActions) Restore(from *TidbClusterInfo, to *TidbClusterInfo) e }() sets := map[string]string{ "clusterName": to.ClusterName, - "name": "test-backup", + "name": to.Name, "mode": "restore", "user": "root", "password": to.Password, @@ -945,7 +967,7 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterInfo, to *TidbClusterIn glog.Infof("check restore end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace) }() - jobName := fmt.Sprintf("%s-restore-test-backup", to.ClusterName) + jobName := fmt.Sprintf("%s-restore-%s", to.ClusterName, from.Name) fn := func() (bool, error) { job, err := oa.kubeCli.BatchV1().Jobs(to.Namespace).Get(jobName, metav1.GetOptions{}) if err != nil { @@ -1065,13 +1087,196 @@ func releaseIsExist(err error) bool { } func (oa *operatorActions) DeployScheduledBackup(info *TidbClusterInfo) error { + glog.Infof("begin to deploy scheduled backup") + defer func() { + glog.Infof("deploy shceduled backup end") + }() + minute := time.Now().Minute() + cron := fmt.Sprintf("'%d * * * *'", (minute+5)%60) + sets := map[string]string{ + "clusterName": info.ClusterName, + "scheduledBackup.create": "true", + "scheduledBackup.user": "root", + "scheduledBackup.password": info.Password, + "scheduledBackup.schedule": cron, + "scheduledBackup.storage": "10Gi", + } + var buffer bytes.Buffer + for k, v := range sets { + set := fmt.Sprintf(" --set %s=%s", k, v) + _, err := buffer.WriteString(set) + if err != nil { + return err + } + } + + setStr := buffer.String() + + cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster %s", + info.ClusterName, info.OperatorTag, setStr) + + glog.Infof("scheduled-backup delploy [%s]", cmd) + res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to launch scheduler backup job: %v, %s", err, string(res)) + } return nil } func (oa *operatorActions) CheckScheduledBackup(info *TidbClusterInfo) error { + glog.Infof("begin to check scheduler backup cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) + defer func() { + glog.Infof("deploy check scheduler end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) + }() + + jobName := fmt.Sprintf("%s-scheduled-backup", info.ClusterName) + fn := func() (bool, error) { + job, err := oa.kubeCli.BatchV1beta1().CronJobs(info.Namespace).Get(jobName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get cronjobs %s ,%v", jobName, err) + return false, nil + } + + jobs, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).List(metav1.ListOptions{}) + if err != nil { + glog.Errorf("failed to list jobs %s ,%v", info.Namespace, err) + return false, nil + } + + backupJobs := []batchv1.Job{} + for _, j := range jobs.Items { + if pid, found := getParentUIDFromJob(j); found && pid == job.UID { + backupJobs = append(backupJobs, j) + } + } + + if len(backupJobs) == 0 { + glog.Errorf("cluster [%s] scheduler jobs is creating, please wait!", info.ClusterName) + return false, nil + } + + for _, j := range backupJobs { + if j.Status.Succeeded == 0 { + glog.Errorf("cluster [%s] back up job is not completed, please wait! ", info.ClusterName) + return false, nil + } + } + + return true, nil + } + + err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) + if err != nil { + return fmt.Errorf("failed to launch scheduler backup job: %v", err) + } + + backupDir, err = oa.getBackupDir(info) + if err != nil { + return fmt.Errorf("failed to get backup dir: %v", err) + } + return nil } +func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) { + controllerRef := metav1.GetControllerOf(&j) + + if controllerRef == nil { + return types.UID(""), false + } + + if controllerRef.Kind != "CronJob" { + glog.Infof("Job with non-CronJob parent, name %s namespace %s", j.Name, j.Namespace) + return types.UID(""), false + } + + return controllerRef.UID, true +} + +func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (string, error) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: getBackupDirPodName, + Namespace: info.Namespace, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: getBackupDirPodName, + Image: "pingcap/tidb-cloud-backup:latest", + Command: []string{"sleep", "3000"}, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "data", + MountPath: "/data", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "data", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: info.Name, + }, + }, + }, + }, + }, + } + + fn := func() (bool, error) { + _, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{}) + if !errors.IsNotFound(err) { + return false, nil + } + return true, nil + } + + err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) + + if err != nil { + return "", fmt.Errorf("failed to delete pod %s", getBackupDirPodName) + } + + _, err = oa.kubeCli.CoreV1().Pods(info.Namespace).Create(pod) + if err != nil && !errors.IsAlreadyExists(err) { + glog.Errorf("cluster: [%s/%s] create get backup dir pod failed, error :%v", info.Namespace, info.ClusterName, err) + return "", err + } + + fn = func() (bool, error) { + _, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return false, nil + } + return true, nil + } + + err = wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) + + if err != nil { + return "", fmt.Errorf("failed to create pod %s", getBackupDirPodName) + } + + cmd := fmt.Sprintf("kubectl exec %s -n %s ls /data", getBackupDirPodName, info.Namespace) + glog.Infof(cmd) + res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() + if err != nil { + glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, res) + return "", err + } + + dirs := strings.Split(string(res), "\n") + glog.Infof("dirs in pod info name [%s] dir name [%s]", info.Name, strings.Join(dirs, ",")) + return strings.TrimSpace(dirs[0]), nil +} + +func (info *TidbClusterInfo) FullName() string { + return fmt.Sprintf("%s/%s", info.Namespace, info.ClusterName) +} + func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterInfo, to *TidbClusterInfo) error { return nil } diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go index 07c5da0fec..2eb544842c 100644 --- a/tests/backup/backupcase.go +++ b/tests/backup/backupcase.go @@ -70,5 +70,20 @@ func (bc *BackupCase) Run() error { return err } + bc.srcCluster.Name = "demo-scheduled-backup" + bc.desCluster.Name = "demo-scheduled-backup" + + err = bc.operator.DeployScheduledBackup(bc.srcCluster) + if err != nil { + glog.Errorf("cluster:[%s] scheduler happen error: %v", bc.srcCluster.ClusterName, err) + return err + } + + err = bc.operator.CheckScheduledBackup(bc.srcCluster) + if err != nil { + glog.Errorf("cluster:[%s] scheduler failed error: %v", bc.srcCluster.ClusterName, err) + return err + } + return nil } diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 607ee90b55..9bd15eaf78 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -61,7 +61,11 @@ func main() { glog.Fatal(err) } + // create database and table and insert a column for test backup and restore + initSql := `"create database record;use record;create table test(t char(32))"` + clusterInfo := &tests.TidbClusterInfo{ + Name: "test-backup", Namespace: "tidb", ClusterName: "demo", OperatorTag: "master", @@ -70,6 +74,7 @@ func main() { TiDBImage: "pingcap/tidb:v2.1.3", StorageClassName: "local-storage", Password: "admin", + InitSql: initSql, Args: map[string]string{}, } @@ -88,6 +93,7 @@ func main() { } restoreClusterInfo := &tests.TidbClusterInfo{ + Name: "test-backup", Namespace: "tidb", ClusterName: "demo2", OperatorTag: "master", @@ -96,6 +102,7 @@ func main() { TiDBImage: "pingcap/tidb:v2.1.3", StorageClassName: "local-storage", Password: "admin", + InitSql: initSql, Args: map[string]string{}, } From 2746c823452fac6c76893551a00b61f62e7e5d2a Mon Sep 17 00:00:00 2001 From: shuijing198799 Date: Mon, 18 Mar 2019 21:50:43 +0800 Subject: [PATCH 2/6] delete unnessary step and add scheduled-backup-job check step --- tests/actions.go | 47 ++++++++++++++++++-------------------- tests/backup/backupcase.go | 7 ------ tests/cmd/e2e/main.go | 12 +++++----- 3 files changed, 28 insertions(+), 38 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index 27c1a93fc8..f559cae928 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -49,14 +49,10 @@ func NewOperatorActions(cli versioned.Interface, kubeCli kubernetes.Interface) O const ( DefaultPollTimeout time.Duration = 10 * time.Minute - DefaultPollInterval time.Duration = 1 * time.Minute + DefaultPollInterval time.Duration = 10 * time.Second getBackupDirPodName = "get-backup-dir" ) -var ( - backupDir string -) - type OperatorActions interface { DeployOperator(info *OperatorInfo) error CleanOperator(info *OperatorInfo) error @@ -81,7 +77,7 @@ type OperatorActions interface { CleanMonitor(info *TidbClusterInfo) error ForceDeploy(info *TidbClusterInfo) error CreateSecret(info *TidbClusterInfo) error - getBackupDir(info *TidbClusterInfo) (string, error) + getBackupDir(info *TidbClusterInfo) (int, error) } type FaultTriggerActions interface { @@ -260,9 +256,10 @@ func (oa *operatorActions) CleanTidbCluster(info *TidbClusterInfo) error { } } - _, err := oa.kubeCli.CoreV1().Pods(info.Namespace).Get(getBackupDirPodName, metav1.GetOptions{}) - if !errors.IsNotFound(err) { - oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{}) + err := oa.kubeCli.CoreV1().Pods(info.Namespace).Delete(getBackupDirPodName, &metav1.DeleteOptions{}) + + if err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete dir pod %v", err) } setStr := label.New().Instance(info.ClusterName).String() @@ -860,8 +857,6 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error { defer func() { glog.Infof("deploy adhoc backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) }() - // I need a name for pvc and another name for backupDir, - // But in operator, there are the same sets := map[string]string{ "clusterName": info.ClusterName, "name": info.Name, @@ -918,11 +913,6 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error { return fmt.Errorf("failed to launch scheduler backup job: %v", err) } - backupDir, err = oa.getBackupDir(info) - if err != nil { - return fmt.Errorf("failed to get backup dir: %v", err) - } - return nil } @@ -1091,8 +1081,8 @@ func (oa *operatorActions) DeployScheduledBackup(info *TidbClusterInfo) error { defer func() { glog.Infof("deploy shceduled backup end") }() - minute := time.Now().Minute() - cron := fmt.Sprintf("'%d * * * *'", (minute+5)%60) + + cron := fmt.Sprintf("'*/1 * * * *'") sets := map[string]string{ "clusterName": info.ClusterName, "scheduledBackup.create": "true", @@ -1170,11 +1160,18 @@ func (oa *operatorActions) CheckScheduledBackup(info *TidbClusterInfo) error { return fmt.Errorf("failed to launch scheduler backup job: %v", err) } - backupDir, err = oa.getBackupDir(info) + // sleep 1 minute for cronjob + time.Sleep(60 * time.Second) + + dirs, err := oa.getBackupDir(info) if err != nil { return fmt.Errorf("failed to get backup dir: %v", err) } + if dirs != 3 { + return fmt.Errorf("scheduler job failed!") + } + return nil } @@ -1193,7 +1190,7 @@ func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) { return controllerRef.UID, true } -func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (string, error) { +func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: getBackupDirPodName, @@ -1237,13 +1234,13 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (string, error) { err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) if err != nil { - return "", fmt.Errorf("failed to delete pod %s", getBackupDirPodName) + return 0, fmt.Errorf("failed to delete pod %s", getBackupDirPodName) } _, err = oa.kubeCli.CoreV1().Pods(info.Namespace).Create(pod) if err != nil && !errors.IsAlreadyExists(err) { glog.Errorf("cluster: [%s/%s] create get backup dir pod failed, error :%v", info.Namespace, info.ClusterName, err) - return "", err + return 0, err } fn = func() (bool, error) { @@ -1257,7 +1254,7 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (string, error) { err = wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) if err != nil { - return "", fmt.Errorf("failed to create pod %s", getBackupDirPodName) + return 0, fmt.Errorf("failed to create pod %s", getBackupDirPodName) } cmd := fmt.Sprintf("kubectl exec %s -n %s ls /data", getBackupDirPodName, info.Namespace) @@ -1265,12 +1262,12 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (string, error) { res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, res) - return "", err + return 0, err } dirs := strings.Split(string(res), "\n") glog.Infof("dirs in pod info name [%s] dir name [%s]", info.Name, strings.Join(dirs, ",")) - return strings.TrimSpace(dirs[0]), nil + return len(dirs), nil } func (info *TidbClusterInfo) FullName() string { diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go index 2eb544842c..e462106576 100644 --- a/tests/backup/backupcase.go +++ b/tests/backup/backupcase.go @@ -46,12 +46,6 @@ func (bc *BackupCase) Run() error { return err } - err = bc.operator.ForceDeploy(bc.desCluster) - if err != nil { - glog.Errorf("cluster:[%s] deploy happen error: %v", bc.desCluster.ClusterName, err) - return err - } - err = bc.operator.CheckTidbClusterStatus(bc.desCluster) if err != nil { glog.Errorf("cluster:[%s] deploy faild error: %v", bc.desCluster.ClusterName, err) @@ -71,7 +65,6 @@ func (bc *BackupCase) Run() error { } bc.srcCluster.Name = "demo-scheduled-backup" - bc.desCluster.Name = "demo-scheduled-backup" err = bc.operator.DeployScheduledBackup(bc.srcCluster) if err != nil { diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 9bd15eaf78..cd2fdc46c1 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -69,9 +69,9 @@ func main() { Namespace: "tidb", ClusterName: "demo", OperatorTag: "master", - PDImage: "pingcap/pd:v2.1.3", - TiKVImage: "pingcap/tikv:v2.1.3", - TiDBImage: "pingcap/tidb:v2.1.3", + PDImage: "pingcap/pd:v2.1.0", + TiKVImage: "pingcap/tikv:v2.1.0", + TiDBImage: "pingcap/tidb:v2.1.0", StorageClassName: "local-storage", Password: "admin", InitSql: initSql, @@ -97,9 +97,9 @@ func main() { Namespace: "tidb", ClusterName: "demo2", OperatorTag: "master", - PDImage: "pingcap/pd:v2.1.3", - TiKVImage: "pingcap/tikv:v2.1.3", - TiDBImage: "pingcap/tidb:v2.1.3", + PDImage: "pingcap/pd:v2.1.0", + TiKVImage: "pingcap/tikv:v2.1.0", + TiDBImage: "pingcap/tidb:v2.1.0", StorageClassName: "local-storage", Password: "admin", InitSql: initSql, From 79dfe9a2594d31352bd4eac0880ba0df824f7bf9 Mon Sep 17 00:00:00 2001 From: shuijing198799 Date: Tue, 19 Mar 2019 11:42:40 +0800 Subject: [PATCH 3/6] change name to backupPVC --- tests/actions.go | 14 +++++++------- tests/backup/backupcase.go | 2 +- tests/cmd/e2e/main.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index f559cae928..5dc973b6cf 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -117,7 +117,7 @@ type OperatorInfo struct { } type TidbClusterInfo struct { - Name string + BackupPVC string Namespace string ClusterName string OperatorTag string @@ -859,7 +859,7 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error { }() sets := map[string]string{ "clusterName": info.ClusterName, - "name": info.Name, + "name": info.BackupPVC, "mode": "backup", "user": "root", "password": info.Password, @@ -893,7 +893,7 @@ func (oa *operatorActions) CheckAdHocBackup(info *TidbClusterInfo) error { glog.Infof("deploy clean backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) }() - jobName := fmt.Sprintf("%s-%s", info.ClusterName, info.Name) + jobName := fmt.Sprintf("%s-%s", info.ClusterName, info.BackupPVC) fn := func() (bool, error) { job, err := oa.kubeCli.BatchV1().Jobs(info.Namespace).Get(jobName, metav1.GetOptions{}) if err != nil { @@ -923,7 +923,7 @@ func (oa *operatorActions) Restore(from *TidbClusterInfo, to *TidbClusterInfo) e }() sets := map[string]string{ "clusterName": to.ClusterName, - "name": to.Name, + "name": to.BackupPVC, "mode": "restore", "user": "root", "password": to.Password, @@ -957,7 +957,7 @@ func (oa *operatorActions) CheckRestore(from *TidbClusterInfo, to *TidbClusterIn glog.Infof("check restore end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace) }() - jobName := fmt.Sprintf("%s-restore-%s", to.ClusterName, from.Name) + jobName := fmt.Sprintf("%s-restore-%s", to.ClusterName, from.BackupPVC) fn := func() (bool, error) { job, err := oa.kubeCli.BatchV1().Jobs(to.Namespace).Get(jobName, metav1.GetOptions{}) if err != nil { @@ -1215,7 +1215,7 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { Name: "data", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: info.Name, + ClaimName: info.BackupPVC, }, }, }, @@ -1266,7 +1266,7 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { } dirs := strings.Split(string(res), "\n") - glog.Infof("dirs in pod info name [%s] dir name [%s]", info.Name, strings.Join(dirs, ",")) + glog.Infof("dirs in pod info name [%s] dir name [%s]", info.BackupPVC, strings.Join(dirs, ",")) return len(dirs), nil } diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go index e462106576..dea051e9a4 100644 --- a/tests/backup/backupcase.go +++ b/tests/backup/backupcase.go @@ -64,7 +64,7 @@ func (bc *BackupCase) Run() error { return err } - bc.srcCluster.Name = "demo-scheduled-backup" + bc.srcCluster.BackupPVC = "demo-scheduled-backup" err = bc.operator.DeployScheduledBackup(bc.srcCluster) if err != nil { diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index cd2fdc46c1..87269228eb 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -65,7 +65,7 @@ func main() { initSql := `"create database record;use record;create table test(t char(32))"` clusterInfo := &tests.TidbClusterInfo{ - Name: "test-backup", + BackupPVC: "test-backup", Namespace: "tidb", ClusterName: "demo", OperatorTag: "master", @@ -93,7 +93,7 @@ func main() { } restoreClusterInfo := &tests.TidbClusterInfo{ - Name: "test-backup", + BackupPVC: "test-backup", Namespace: "tidb", ClusterName: "demo2", OperatorTag: "master", From 3dbddde6d20864349b6ce13a8b0565cfc4d2a18a Mon Sep 17 00:00:00 2001 From: shuijing198799 Date: Tue, 19 Mar 2019 11:49:37 +0800 Subject: [PATCH 4/6] return a string instead of int in getbackupdir --- tests/actions.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/actions.go b/tests/actions.go index 5dc973b6cf..5ffe6433f0 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -77,7 +77,7 @@ type OperatorActions interface { CleanMonitor(info *TidbClusterInfo) error ForceDeploy(info *TidbClusterInfo) error CreateSecret(info *TidbClusterInfo) error - getBackupDir(info *TidbClusterInfo) (int, error) + getBackupDir(info *TidbClusterInfo) ([]string, error) } type FaultTriggerActions interface { @@ -1168,7 +1168,7 @@ func (oa *operatorActions) CheckScheduledBackup(info *TidbClusterInfo) error { return fmt.Errorf("failed to get backup dir: %v", err) } - if dirs != 3 { + if len(dirs) != 3 { return fmt.Errorf("scheduler job failed!") } @@ -1190,7 +1190,7 @@ func getParentUIDFromJob(j batchv1.Job) (types.UID, bool) { return controllerRef.UID, true } -func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { +func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) ([]string, error) { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: getBackupDirPodName, @@ -1234,13 +1234,13 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) if err != nil { - return 0, fmt.Errorf("failed to delete pod %s", getBackupDirPodName) + return nil, fmt.Errorf("failed to delete pod %s", getBackupDirPodName) } _, err = oa.kubeCli.CoreV1().Pods(info.Namespace).Create(pod) if err != nil && !errors.IsAlreadyExists(err) { glog.Errorf("cluster: [%s/%s] create get backup dir pod failed, error :%v", info.Namespace, info.ClusterName, err) - return 0, err + return nil, err } fn = func() (bool, error) { @@ -1254,7 +1254,7 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { err = wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) if err != nil { - return 0, fmt.Errorf("failed to create pod %s", getBackupDirPodName) + return nil, fmt.Errorf("failed to create pod %s", getBackupDirPodName) } cmd := fmt.Sprintf("kubectl exec %s -n %s ls /data", getBackupDirPodName, info.Namespace) @@ -1262,12 +1262,12 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) (int, error) { res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, res) - return 0, err + return nil, err } dirs := strings.Split(string(res), "\n") glog.Infof("dirs in pod info name [%s] dir name [%s]", info.BackupPVC, strings.Join(dirs, ",")) - return len(dirs), nil + return dirs, nil } func (info *TidbClusterInfo) FullName() string { From 84d4698800d1793fc60bfbbe979fe31da528482b Mon Sep 17 00:00:00 2001 From: shuijing198799 Date: Tue, 19 Mar 2019 14:26:57 +0800 Subject: [PATCH 5/6] resolve some merge error and naming conflict --- tests/actions.go | 6 +- tests/pkg/blockWriter/blockWriter.go | 273 --------------------------- 2 files changed, 3 insertions(+), 276 deletions(-) delete mode 100644 tests/pkg/blockWriter/blockWriter.go diff --git a/tests/actions.go b/tests/actions.go index eeec91b525..ee87c0cff6 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -28,7 +28,7 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/golang/glog" - "github.com/pingcap/errors" + pingcapErrors "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" @@ -418,7 +418,7 @@ func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error { glog.Info("[SCALE] " + cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { - return errors.Wrapf(err, "failed to scale tidb cluster: %s", string(res)) + return pingcapErrors.Wrapf(err, "failed to scale tidb cluster: %s", string(res)) } return nil } @@ -429,7 +429,7 @@ func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error { glog.Info("[UPGRADE] " + cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { - return errors.Wrapf(err, "failed to upgrade tidb cluster: %s", string(res)) + return pingcapErrors.Wrapf(err, "failed to upgrade tidb cluster: %s", string(res)) } return nil } diff --git a/tests/pkg/blockWriter/blockWriter.go b/tests/pkg/blockWriter/blockWriter.go deleted file mode 100644 index 8434f151b0..0000000000 --- a/tests/pkg/blockWriter/blockWriter.go +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License.package spec - -package blockwriter - -import ( - "context" - "database/sql" - "fmt" - "math/rand" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/golang/glog" - "github.com/pingcap/tidb-operator/tests/pkg/util" - "k8s.io/apimachinery/pkg/util/wait" -) - -const ( - queryChanSize int = 10000 -) - -// BlockWriterCase is for concurrent writing blocks. -type BlockWriterCase struct { - cfg Config - bws []*blockWriter - - isRunning uint32 - isInit uint32 - stopChan chan struct{} - - sync.RWMutex -} - -// Config defines the config of BlockWriterCase -type Config struct { - TableNum int - Concurrency int - BatchSize int - RawSize int -} - -type blockWriter struct { - rawSize int - values []string - batchSize int -} - -// NewBlockWriterCase returns the BlockWriterCase. -func NewBlockWriterCase(cfg Config) *BlockWriterCase { - c := &BlockWriterCase{ - cfg: cfg, - stopChan: make(chan struct{}, 1), - } - - if c.cfg.TableNum < 1 { - c.cfg.TableNum = 1 - } - c.initBlocks() - - return c -} - -func (c *BlockWriterCase) initBlocks() { - c.bws = make([]*blockWriter, c.cfg.Concurrency) - for i := 0; i < c.cfg.Concurrency; i++ { - c.bws[i] = c.newBlockWriter() - } -} - -func (c *BlockWriterCase) newBlockWriter() *blockWriter { - return &blockWriter{ - rawSize: c.cfg.RawSize, - values: make([]string, c.cfg.BatchSize), - batchSize: c.cfg.BatchSize, - } -} - -func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) { - defer func() { - glog.Infof("[%s] [action: generate Query] stopped", c) - wg.Done() - }() - - for { - tableN := rand.Intn(c.cfg.TableNum) - var index string - if tableN > 0 { - index = fmt.Sprintf("%d", tableN) - } - - var querys []string - for i := 0; i < 100; i++ { - values := make([]string, c.cfg.BatchSize) - for i := 0; i < c.cfg.BatchSize; i++ { - blockData := util.RandString(c.cfg.RawSize) - values[i] = fmt.Sprintf("('%s')", blockData) - } - - querys = append(querys, fmt.Sprintf( - "INSERT INTO block_writer%s(raw_bytes) VALUES %s", - index, strings.Join(values, ","))) - } - - select { - case <-ctx.Done(): - return - default: - if len(queryChan) < queryChanSize { - queryChan <- querys - } else { - glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c) - util.Sleep(ctx, 10*time.Second) - } - } - } -} - -func (bw *blockWriter) batchExecute(db *sql.DB, query string) error { - _, err := db.Exec(query) - if err != nil { - glog.Errorf("[block_writer] exec sql [%s] failed, err: %v", query, err) - return err - } - - return nil -} - -func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []string) { - for { - select { - case <-ctx.Done(): - return - default: - } - - querys, ok := <-queryChan - if !ok { - // No more query - return - } - - for _, query := range querys { - select { - case <-ctx.Done(): - return - default: - if err := bw.batchExecute(db, query); err != nil { - glog.Fatal(err) - } - } - } - } -} - -// Initialize inits case -func (c *BlockWriterCase) initialize(db *sql.DB) error { - glog.Infof("[%s] start to init...", c) - defer func() { - atomic.StoreUint32(&c.isInit, 1) - glog.Infof("[%s] init end...", c) - }() - - for i := 0; i < c.cfg.TableNum; i++ { - var s string - if i > 0 { - s = fmt.Sprintf("%d", i) - } - - tmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS block_writer%s %s", s, ` - ( - id BIGINT NOT NULL AUTO_INCREMENT, - raw_bytes BLOB NOT NULL, - PRIMARY KEY (id) -)`) - - err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { - _, err := db.Exec(tmt) - if err != nil { - glog.Warningf("[%s] exec sql [%s] failed, err: %v, retry...", c, tmt, err) - return false, nil - } - - return true, nil - }) - - if err != nil { - glog.Errorf("[%s] exec sql [%s] failed, err: %v", c, tmt, err) - return err - } - } - - return nil -} - -// Start starts to run cases -func (c *BlockWriterCase) Start(db *sql.DB) error { - if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) { - err := fmt.Errorf("[%s] is running, you can't start it again", c) - glog.Error(err) - return err - } - - defer func() { - c.RLock() - glog.Infof("[%s] stopped", c) - atomic.SwapUint32(&c.isRunning, 0) - }() - - if c.isInit == 0 { - if err := c.initialize(db); err != nil { - return err - } - } - - glog.Infof("[%s] start to execute case...", c) - - var wg sync.WaitGroup - - ctx, cancel := context.WithCancel(context.Background()) - - queryChan := make(chan []string, queryChanSize) - - for i := 0; i < c.cfg.Concurrency; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - c.bws[i].run(ctx, db, queryChan) - }(i) - } - - wg.Add(1) - go c.generateQuery(ctx, queryChan, &wg) - -loop: - for { - select { - case <-c.stopChan: - glog.Infof("[%s] stoping...", c) - cancel() - break loop - default: - util.Sleep(context.Background(), 2*time.Second) - } - } - - wg.Wait() - close(queryChan) - - return nil -} - -// Stop stops cases -func (c *BlockWriterCase) Stop() { - c.stopChan <- struct{}{} -} - -// String implements fmt.Stringer interface. -func (c *BlockWriterCase) String() string { - return "block_writer" -} From aa70d1863d8dd8317de44ca133bcbe9042431790 Mon Sep 17 00:00:00 2001 From: shuijing198799 Date: Tue, 19 Mar 2019 14:28:35 +0800 Subject: [PATCH 6/6] resolve some merge error and naming conflict --- tests/pkg/blockwriter/blockwriter.go | 273 +++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 tests/pkg/blockwriter/blockwriter.go diff --git a/tests/pkg/blockwriter/blockwriter.go b/tests/pkg/blockwriter/blockwriter.go new file mode 100644 index 0000000000..8434f151b0 --- /dev/null +++ b/tests/pkg/blockwriter/blockwriter.go @@ -0,0 +1,273 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License.package spec + +package blockwriter + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/golang/glog" + "github.com/pingcap/tidb-operator/tests/pkg/util" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + queryChanSize int = 10000 +) + +// BlockWriterCase is for concurrent writing blocks. +type BlockWriterCase struct { + cfg Config + bws []*blockWriter + + isRunning uint32 + isInit uint32 + stopChan chan struct{} + + sync.RWMutex +} + +// Config defines the config of BlockWriterCase +type Config struct { + TableNum int + Concurrency int + BatchSize int + RawSize int +} + +type blockWriter struct { + rawSize int + values []string + batchSize int +} + +// NewBlockWriterCase returns the BlockWriterCase. +func NewBlockWriterCase(cfg Config) *BlockWriterCase { + c := &BlockWriterCase{ + cfg: cfg, + stopChan: make(chan struct{}, 1), + } + + if c.cfg.TableNum < 1 { + c.cfg.TableNum = 1 + } + c.initBlocks() + + return c +} + +func (c *BlockWriterCase) initBlocks() { + c.bws = make([]*blockWriter, c.cfg.Concurrency) + for i := 0; i < c.cfg.Concurrency; i++ { + c.bws[i] = c.newBlockWriter() + } +} + +func (c *BlockWriterCase) newBlockWriter() *blockWriter { + return &blockWriter{ + rawSize: c.cfg.RawSize, + values: make([]string, c.cfg.BatchSize), + batchSize: c.cfg.BatchSize, + } +} + +func (c *BlockWriterCase) generateQuery(ctx context.Context, queryChan chan []string, wg *sync.WaitGroup) { + defer func() { + glog.Infof("[%s] [action: generate Query] stopped", c) + wg.Done() + }() + + for { + tableN := rand.Intn(c.cfg.TableNum) + var index string + if tableN > 0 { + index = fmt.Sprintf("%d", tableN) + } + + var querys []string + for i := 0; i < 100; i++ { + values := make([]string, c.cfg.BatchSize) + for i := 0; i < c.cfg.BatchSize; i++ { + blockData := util.RandString(c.cfg.RawSize) + values[i] = fmt.Sprintf("('%s')", blockData) + } + + querys = append(querys, fmt.Sprintf( + "INSERT INTO block_writer%s(raw_bytes) VALUES %s", + index, strings.Join(values, ","))) + } + + select { + case <-ctx.Done(): + return + default: + if len(queryChan) < queryChanSize { + queryChan <- querys + } else { + glog.Infof("[%s] [action: generate Query] query channel is full, sleep 10 seconds", c) + util.Sleep(ctx, 10*time.Second) + } + } + } +} + +func (bw *blockWriter) batchExecute(db *sql.DB, query string) error { + _, err := db.Exec(query) + if err != nil { + glog.Errorf("[block_writer] exec sql [%s] failed, err: %v", query, err) + return err + } + + return nil +} + +func (bw *blockWriter) run(ctx context.Context, db *sql.DB, queryChan chan []string) { + for { + select { + case <-ctx.Done(): + return + default: + } + + querys, ok := <-queryChan + if !ok { + // No more query + return + } + + for _, query := range querys { + select { + case <-ctx.Done(): + return + default: + if err := bw.batchExecute(db, query); err != nil { + glog.Fatal(err) + } + } + } + } +} + +// Initialize inits case +func (c *BlockWriterCase) initialize(db *sql.DB) error { + glog.Infof("[%s] start to init...", c) + defer func() { + atomic.StoreUint32(&c.isInit, 1) + glog.Infof("[%s] init end...", c) + }() + + for i := 0; i < c.cfg.TableNum; i++ { + var s string + if i > 0 { + s = fmt.Sprintf("%d", i) + } + + tmt := fmt.Sprintf("CREATE TABLE IF NOT EXISTS block_writer%s %s", s, ` + ( + id BIGINT NOT NULL AUTO_INCREMENT, + raw_bytes BLOB NOT NULL, + PRIMARY KEY (id) +)`) + + err := wait.PollImmediate(5*time.Second, 30*time.Second, func() (bool, error) { + _, err := db.Exec(tmt) + if err != nil { + glog.Warningf("[%s] exec sql [%s] failed, err: %v, retry...", c, tmt, err) + return false, nil + } + + return true, nil + }) + + if err != nil { + glog.Errorf("[%s] exec sql [%s] failed, err: %v", c, tmt, err) + return err + } + } + + return nil +} + +// Start starts to run cases +func (c *BlockWriterCase) Start(db *sql.DB) error { + if !atomic.CompareAndSwapUint32(&c.isRunning, 0, 1) { + err := fmt.Errorf("[%s] is running, you can't start it again", c) + glog.Error(err) + return err + } + + defer func() { + c.RLock() + glog.Infof("[%s] stopped", c) + atomic.SwapUint32(&c.isRunning, 0) + }() + + if c.isInit == 0 { + if err := c.initialize(db); err != nil { + return err + } + } + + glog.Infof("[%s] start to execute case...", c) + + var wg sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + queryChan := make(chan []string, queryChanSize) + + for i := 0; i < c.cfg.Concurrency; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + c.bws[i].run(ctx, db, queryChan) + }(i) + } + + wg.Add(1) + go c.generateQuery(ctx, queryChan, &wg) + +loop: + for { + select { + case <-c.stopChan: + glog.Infof("[%s] stoping...", c) + cancel() + break loop + default: + util.Sleep(context.Background(), 2*time.Second) + } + } + + wg.Wait() + close(queryChan) + + return nil +} + +// Stop stops cases +func (c *BlockWriterCase) Stop() { + c.stopChan <- struct{}{} +} + +// String implements fmt.Stringer interface. +func (c *BlockWriterCase) String() string { + return "block_writer" +}