diff --git a/.drone.yml b/.drone.yml index 60ed08f0a..e73381a32 100644 --- a/.drone.yml +++ b/.drone.yml @@ -140,6 +140,8 @@ pipeline: e2e-tests: image: quay.io/presslabs/bfc + secrets: + - GOOGLE_CREDENTIALS environment: - APP_VERSION=${DRONE_TAG} - KUBECONFIG=/root/go/.kube/config diff --git a/pkg/sidecar/apptakebackup/apptakebackup.go b/pkg/sidecar/apptakebackup/apptakebackup.go index cea710dfc..2aafb92f0 100644 --- a/pkg/sidecar/apptakebackup/apptakebackup.go +++ b/pkg/sidecar/apptakebackup/apptakebackup.go @@ -67,16 +67,16 @@ func pushBackupFromTo(srcHost, destBucket string) error { return fmt.Errorf("rclone start error: %s", err) } - log.V(2).Info("wait for gzip to finish") - if err := gzip.Wait(); err != nil { - return fmt.Errorf("gzip wait error: %s", err) - } - log.V(2).Info("wait for rclone to finish") if err := rclone.Wait(); err != nil { return fmt.Errorf("rclone wait error: %s", err) } + log.V(2).Info("wait for gzip to finish") + if err := gzip.Wait(); err != nil { + return fmt.Errorf("gzip wait error: %s", err) + } + return nil } diff --git a/test/e2e/backups/backups.go b/test/e2e/backups/backups.go new file mode 100644 index 000000000..75509a4d8 --- /dev/null +++ b/test/e2e/backups/backups.go @@ -0,0 +1,190 @@ +/* +Copyright 2018 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + "math/rand" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + core "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" + "github.com/presslabs/mysql-operator/test/e2e/framework" +) + +const ( + POLLING = 2 * time.Second +) + +var ( + one = int32(1) + two = int32(2) +) + +var _ = Describe("Mysql backups tests", func() { + f := framework.NewFramework("mc-1") + + var ( + cluster *api.MysqlCluster + clusterKey types.NamespacedName + secret *core.Secret + backupSecret *core.Secret + bucketName string + timeout time.Duration + rootPwd string + testData string + ) + + BeforeEach(func() { + // be careful, mysql allowed hostname lenght is <63 + name := fmt.Sprintf("cl-%d", rand.Int31()/1000) + rootPwd = fmt.Sprintf("pw-%d", rand.Int31()) + bucketName = framework.GetBucketName() + timeout = 350 * time.Second + + By("creating a new cluster secret") + secret = framework.NewClusterSecret(name, f.Namespace.Name, rootPwd) + Expect(f.Client.Create(context.TODO(), secret)).To(Succeed(), "create cluster secret") + + By("create a new backup secret") + backupSecret = f.NewGCSBackupSecret() + Expect(f.Client.Create(context.TODO(), backupSecret)).To(Succeed(), "create backup secret") + + By("creating a new cluster") + cluster = framework.NewCluster(name, f.Namespace.Name) + clusterKey = types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace} + cluster.Spec.BackupSecretName = backupSecret.Name + Expect(f.Client.Create(context.TODO(), cluster)).To(Succeed(), + "failed to create cluster '%s'", cluster.Name) + + By("testing the cluster readiness") + Eventually(f.RefreshClusterFn(cluster), f.Timeout, POLLING).Should( + framework.HaveClusterReplicas(1)) + Eventually(f.RefreshClusterFn(cluster), f.Timeout, POLLING).Should( + framework.HaveClusterCond(api.ClusterConditionReady, core.ConditionTrue)) + + // refresh cluster + Expect(f.Client.Get(context.TODO(), clusterKey, cluster)).To(Succeed(), + "failed to get cluster %s", cluster.Name) + + // write test data to the cluster + testData = f.WriteSQLTest(cluster, 0, rootPwd) + }) + + Describe("tests with a good backup", func() { + var ( + backup *api.MysqlBackup + ) + BeforeEach(func() { + By("create a backup for cluster") + backup = framework.NewBackup(cluster, bucketName) + Expect(f.Client.Create(context.TODO(), backup)).To(Succeed()) + }) + + AfterEach(func() { + // delete the backup that was created + f.Client.Delete(context.TODO(), backup) + }) + + It("should be successful and can be restored", func() { + // check that the data is in the cluster + actual := f.ReadSQLTest(cluster, 0, rootPwd) + Expect(actual).To(Equal(testData)) + + By("check backup was successful") + // should be backup successfully + Eventually(f.RefreshBackupFn(backup), timeout, POLLING).Should( + framework.BackupCompleted()) + Eventually(f.RefreshBackupFn(backup), timeout, POLLING).Should( + framework.HaveBackupCond(api.BackupComplete, core.ConditionTrue)) + + backup = f.RefreshBackupFn(backup)() + + By("create a new cluster from init bucket") + // create cluster secret + name := fmt.Sprintf("cl-%d-2", rand.Int31()/1000) + sct := framework.NewClusterSecret(name, f.Namespace.Name, rootPwd) + Expect(f.Client.Create(context.TODO(), sct)).To(Succeed(), "create cluster secret") + + // create cluster + cl := framework.NewCluster(name, f.Namespace.Name) + cl.Spec.InitBucketSecretName = backupSecret.Name + cl.Spec.InitBucketURI = backup.Spec.BackupURL + Expect(f.Client.Create(context.TODO(), cl)).To(Succeed(), + "failed to create cluster '%s'", cluster.Name) + + // wait for cluster to be ready + Eventually(f.RefreshClusterFn(cl), f.Timeout, POLLING).Should( + framework.HaveClusterReplicas(1)) + Eventually(f.RefreshClusterFn(cl), f.Timeout, POLLING).Should( + framework.HaveClusterCond(api.ClusterConditionReady, core.ConditionTrue)) + + // check the data that was read before + actual = f.ReadSQLTest(cl, 0, rootPwd) + Expect(actual).To(Equal(testData)) + + }) + }) + + It("should failed the backup if bucket does not exists", func() { + backup := framework.NewBackup(cluster, "gs://does_not_exist") + Expect(f.Client.Create(context.TODO(), backup)).To(Succeed()) + + localTimeout := 150 * time.Second + // checks for the job because the backup is updated after the job is + // marked as failed + Eventually(func() *batchv1.Job { + j := &batchv1.Job{} + key := types.NamespacedName{ + Name: framework.GetNameForJob(backup), + Namespace: backup.Namespace, + } + f.Client.Get(context.TODO(), key, j) + return j + }, localTimeout, POLLING).Should(WithTransform( + func(j *batchv1.Job) int32 { return j.Status.Failed }, BeNumerically(">", 2))) + }) + + It("should take the backup from replica", func() { + By("scale up the cluster") + // scale cluster up + cluster = f.RefreshClusterFn(cluster)() + replicas := int32(2) + cluster.Spec.Replicas = &replicas + Expect(f.Client.Update(context.TODO(), cluster)).To(Succeed()) + Eventually(f.RefreshClusterFn(cluster), f.Timeout, POLLING).Should( + framework.HaveClusterReplicas(int(replicas))) + Eventually(f.RefreshClusterFn(cluster), f.Timeout, POLLING).Should( + framework.HaveClusterCond(api.ClusterConditionReady, core.ConditionTrue)) + + By("create a new backup") + backup := framework.NewBackup(cluster, bucketName) + Expect(f.Client.Create(context.TODO(), backup)).To(Succeed()) + + // checks + Eventually(f.RefreshBackupFn(backup), timeout, POLLING).Should( + framework.BackupCompleted()) + Eventually(f.RefreshBackupFn(backup), timeout, POLLING).Should( + framework.HaveBackupCond(api.BackupComplete, core.ConditionTrue)) + }) +}) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 17d459722..61300ab58 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -22,6 +22,7 @@ import ( "github.com/presslabs/mysql-operator/test/e2e/framework" // test sources + _ "github.com/presslabs/mysql-operator/test/e2e/backups" _ "github.com/presslabs/mysql-operator/test/e2e/cluster" ) diff --git a/test/e2e/framework/backup_util.go b/test/e2e/framework/backup_util.go new file mode 100644 index 000000000..802a8bdf5 --- /dev/null +++ b/test/e2e/framework/backup_util.go @@ -0,0 +1,110 @@ +/* +Copyright 2018 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "fmt" + "os" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + . "github.com/onsi/gomega" + . "github.com/onsi/gomega/gstruct" + gomegatypes "github.com/onsi/gomega/types" + + api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" +) + +func GetBucketName() string { + bucket := os.Getenv("BACKUP_BUCKET_NAME") + if len(bucket) == 0 { + Logf("BACKUP_BUEKET_NAME not set! Backups tests will not work") + } + return fmt.Sprintf("gs://%s", bucket) +} + +func (f *Framework) NewGCSBackupSecret() *corev1.Secret { + json_key := os.Getenv("GOOGLE_CREDENTIALS") + if json_key == "" { + Logf("GOOGLE_CREDENTIALS is not set! Backups tests will not work") + } + + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("backup-secret-%s", f.BaseName), + Namespace: f.Namespace.Name, + }, + StringData: map[string]string{ + "GCS_SERVICE_ACCOUNT_JSON_KEY": json_key, + }, + } +} + +func NewBackup(cluster *api.MysqlCluster, bucket string) *api.MysqlBackup { + return &api.MysqlBackup{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("bk-%s", cluster.Name), + Namespace: cluster.Namespace, + }, + Spec: api.MysqlBackupSpec{ + ClusterName: cluster.Name, + BackupURL: fmt.Sprintf("%s/%s", bucket, cluster.Name), + // the secret is specified on the cluster, no need to specify it here. + }, + } +} + +func (f *Framework) RefreshBackupFn(backup *api.MysqlBackup) func() *api.MysqlBackup { + return func() *api.MysqlBackup { + key := types.NamespacedName{ + Name: backup.Name, + Namespace: backup.Namespace, + } + b := &api.MysqlBackup{} + f.Client.Get(context.TODO(), key, b) + return b + } +} + +// BackupCompleted a matcher to check cluster complition +func BackupCompleted() gomegatypes.GomegaMatcher { + return PointTo(MatchFields(IgnoreExtras, Fields{ + "Status": MatchFields(IgnoreExtras, Fields{ + "Completed": Equal(true), + }), + })) +} + +// HaveBackupCond is a helper func that returns a matcher to check for an existing condition in a ClusterCondition list. +func HaveBackupCond(condType api.BackupConditionType, status corev1.ConditionStatus) gomegatypes.GomegaMatcher { + return PointTo(MatchFields(IgnoreExtras, Fields{ + "Status": MatchFields(IgnoreExtras, Fields{ + "Conditions": ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(condType), + "Status": Equal(status), + })), + })}, + )) +} + +// GetNameForJob returns the job name of a backup +func GetNameForJob(backup *api.MysqlBackup) string { + return fmt.Sprintf("%s-bjob", backup.Name) +} diff --git a/test/e2e/framework/cluster_util.go b/test/e2e/framework/cluster_util.go index f16f159dc..7159fd2f5 100644 --- a/test/e2e/framework/cluster_util.go +++ b/test/e2e/framework/cluster_util.go @@ -25,15 +25,18 @@ import ( "database/sql" kutil_pf "github.com/appscode/kutil/tools/portforward" - core "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" k8score "k8s.io/client-go/kubernetes/typed/core/v1" _ "github.com/go-sql-driver/mysql" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" + gomegatypes "github.com/onsi/gomega/types" api "github.com/presslabs/mysql-operator/pkg/apis/mysql/v1alpha1" ) @@ -43,7 +46,7 @@ var ( ) func (f *Framework) ClusterEventuallyCondition(cluster *api.MysqlCluster, - condType api.ClusterConditionType, status core.ConditionStatus, timeout time.Duration) { + condType api.ClusterConditionType, status corev1.ConditionStatus, timeout time.Duration) { Eventually(func() []api.ClusterCondition { key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace} if err := f.Client.Get(context.TODO(), key, cluster); err != nil { @@ -58,7 +61,7 @@ func (f *Framework) ClusterEventuallyCondition(cluster *api.MysqlCluster, } func (f *Framework) NodeEventuallyCondition(cluster *api.MysqlCluster, nodeName string, - condType api.NodeConditionType, status core.ConditionStatus, timeout time.Duration) { + condType api.NodeConditionType, status corev1.ConditionStatus, timeout time.Duration) { Eventually(func() []api.NodeCondition { key := types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace} if err := f.Client.Get(context.TODO(), key, cluster); err != nil { @@ -95,17 +98,18 @@ func (f *Framework) ExecSQLOnNode(cluster *api.MysqlCluster, i int, user, passwo dsn := fmt.Sprintf("%s:%s@tcp(localhost:%d)/?timeout=10s&multiStatements=true", user, password, tunnel.Local) db, err := sql.Open("mysql", dsn) - Expect(err).NotTo(HaveOccurred(), "Failed connection to mysql DSN: %s", dsn) + Expect(err).To(Succeed(), "Failed connection to mysql DSN: %s", dsn) rows, err := db.Query(query) - Expect(err).NotTo(HaveOccurred(), "Query failed: %s", query) + Expect(err).To(Succeed(), "Query failed: %s", query) + tunnel.Close() return rows } -func (f *Framework) GetPodForNode(cluster *api.MysqlCluster, i int) *core.Pod { +func (f *Framework) GetPodForNode(cluster *api.MysqlCluster, i int) *corev1.Pod { selector := labels.SelectorFromSet(cluster.GetLabels()) - podList, err := f.ClientSet.CoreV1().Pods(cluster.Namespace).List(meta.ListOptions{ + podList, err := f.ClientSet.CoreV1().Pods(cluster.Namespace).List(metav1.ListOptions{ LabelSelector: selector.String(), }) Expect(err).NotTo(HaveOccurred(), "Failed listing pods for cluster '%s'", cluster.Name) @@ -143,3 +147,84 @@ func GetNameForResource(name string, cluster *api.MysqlCluster) string { return fmt.Sprintf("%s-mysql", cluster.Name) } } + +// HaveClusterCond is a helper func that returns a matcher to check for an existing condition in a ClusterCondition list. +func HaveClusterCond(condType api.ClusterConditionType, status corev1.ConditionStatus) gomegatypes.GomegaMatcher { + return PointTo(MatchFields(IgnoreExtras, Fields{ + "Status": MatchFields(IgnoreExtras, Fields{ + "Conditions": ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(condType), + "Status": Equal(status), + })), + })}, + )) +} + +func (f *Framework) RefreshClusterFn(cluster *api.MysqlCluster) func() *api.MysqlCluster { + return func() *api.MysqlCluster { + key := types.NamespacedName{ + Name: cluster.Name, + Namespace: cluster.Namespace, + } + c := &api.MysqlCluster{} + f.Client.Get(context.TODO(), key, c) + return c + } +} + +// HaveClusterRepliacs matcher for replicas +func HaveClusterReplicas(replicas int) gomegatypes.GomegaMatcher { + return PointTo(MatchFields(IgnoreExtras, Fields{ + "Status": MatchFields(IgnoreExtras, Fields{ + "ReadyNodes": Equal(replicas), + }), + })) +} + +var ( + testDBName = "op_test_w" + testTableName = "op_table" +) + +func (f *Framework) WriteSQLTest(cluster *api.MysqlCluster, pod int, pw string) string { + By("run write SQL test to cluster") + + // create database + f.ExecSQLOnNode(cluster, pod, "root", pw, + fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", testDBName), + ) + + // create table + f.ExecSQLOnNode(cluster, pod, "root", pw, fmt.Sprintf( + `USE %s; CREATE TABLE IF NOT EXISTS %s + (k varchar(20) NOT NULL, v varchar(36) NOT NULL, PRIMARY KEY (k));`, + testDBName, testTableName, + )) + + // insert data + data := string(uuid.NewUUID()) + f.ExecSQLOnNode(cluster, pod, "root", pw, fmt.Sprintf( + `USE %s; INSERT INTO %s (k, v) VALUES ("data", "%s") + ON DUPLICATE KEY UPDATE k="data", v="%[3]s";`, + testDBName, testTableName, data, + )) + + return data +} + +func (f *Framework) ReadSQLTest(cluster *api.MysqlCluster, pod int, pw string) string { + By("run read SQL test") + var data string + + rows := f.ExecSQLOnNode(cluster, pod, "root", pw, fmt.Sprintf( + `SELECT v FROM %s.%s WHERE k="data"`, + testDBName, testTableName, + )) + defer rows.Close() + + if rows.Next() { + rows.Scan(&data) + } + + return data +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 65b602d5c..ec0fefea9 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -7,12 +7,10 @@ import ( "time" kcore "github.com/appscode/kutil/core/v1" - "k8s.io/api/core/v1" - core "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" - // apierrs "k8s.io/apimachinery/pkg/api/errors" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" @@ -34,11 +32,11 @@ var log = logf.Log.WithName("framework.util") // CreateTestingNS should be used by every test, note that we append a common prefix to the provided test name. // Please see NewFramework instead of using this directly. -func CreateTestingNS(baseName string, c clientset.Interface, labels map[string]string) (*v1.Namespace, error) { +func CreateTestingNS(baseName string, c clientset.Interface, labels map[string]string) (*corev1.Namespace, error) { if labels == nil { labels = map[string]string{} } - namespaceObj := &v1.Namespace{ + namespaceObj := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ // use a short name because long names produce long hostnames but // maximum allowed length by mysql is 60. @@ -47,10 +45,10 @@ func CreateTestingNS(baseName string, c clientset.Interface, labels map[string]s Namespace: "", Labels: labels, }, - Status: v1.NamespaceStatus{}, + Status: corev1.NamespaceStatus{}, } // Be robust about making the namespace creation call. - var got *v1.Namespace + var got *corev1.Namespace if err := wait.PollImmediate(Poll, 30*time.Second, func() (bool, error) { var err error got, err = c.CoreV1().Namespaces().Create(namespaceObj) @@ -172,7 +170,7 @@ func getPodLogsInternal(c clientset.Interface, namespace, podName, containerName return string(logs), err } -func kubectlLogPod(c clientset.Interface, pod v1.Pod, containerNameSubstr string, logFunc func(ftm string, args ...interface{})) { +func kubectlLogPod(c clientset.Interface, pod corev1.Pod, containerNameSubstr string, logFunc func(ftm string, args ...interface{})) { for _, container := range pod.Spec.Containers { if strings.Contains(container.Name, containerNameSubstr) { // Contains() matches all strings if substr is empty @@ -229,8 +227,8 @@ func NewCluster(name, ns string) *api.MysqlCluster { } -func NewClusterSecret(name, ns, pw string) *core.Secret { - return &core.Secret{ +func NewClusterSecret(name, ns, pw string) *corev1.Secret { + return &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns,