From 557a8e71512c647a163fdfb506bca508201be977 Mon Sep 17 00:00:00 2001 From: dougfales Date: Tue, 5 Nov 2019 06:24:11 -0700 Subject: [PATCH 01/13] Clone from k8s services instead of individual hosts. This change adds the SidecarServerPort to the MasterService and introduces one new service, HealthyReplicasService, so that we can try to clone from replicas first, then fall back to master. * Added a test suite for RunCloneCommand logic, along with a mock backup server. * Added checks for service availability when cloning. * Added "fail fast" logic when unexpected errors occur during cloning/download. * Added dataDir cleanup code so that interrupted cloning does not leave dataDir in an inconsistent state. * Added e2e test demonstrating cloning failure when PVC is removed and pod recreated. * Changed the connect timeout from the default of 30s to 5s so that an empty k8s service will not cause cloning attempts to hang unnecessarily for 30s. --- .../syncer/healthy_replicas_service.go | 66 +++++ .../internal/syncer/master_service.go | 8 +- .../mysqlcluster/mysqlcluster_controller.go | 1 + pkg/internal/mysqlcluster/mysqlcluster.go | 6 +- pkg/sidecar/appclone.go | 118 +++++++-- pkg/sidecar/appclone_fakeserver_test.go | 135 +++++++++++ pkg/sidecar/appclone_test.go | 226 ++++++++++++++++++ pkg/sidecar/configs.go | 14 ++ pkg/sidecar/constants.go | 5 + pkg/sidecar/server.go | 35 ++- test/e2e/cluster/cluster.go | 53 +++- 11 files changed, 640 insertions(+), 27 deletions(-) create mode 100644 pkg/controller/mysqlcluster/internal/syncer/healthy_replicas_service.go create mode 100644 pkg/sidecar/appclone_fakeserver_test.go create mode 100644 pkg/sidecar/appclone_test.go diff --git a/pkg/controller/mysqlcluster/internal/syncer/healthy_replicas_service.go b/pkg/controller/mysqlcluster/internal/syncer/healthy_replicas_service.go new file mode 100644 index 000000000..01ad64665 --- /dev/null +++ b/pkg/controller/mysqlcluster/internal/syncer/healthy_replicas_service.go @@ -0,0 +1,66 @@ +/* +Copyright 2018 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlcluster + +import ( + "github.com/presslabs/controller-util/syncer" + core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/presslabs/mysql-operator/pkg/internal/mysqlcluster" +) + +// NewHealthyReplicasSVCSyncer returns a service syncer for healthy replicas service +func NewHealthyReplicasSVCSyncer(c client.Client, scheme *runtime.Scheme, cluster *mysqlcluster.MysqlCluster) syncer.Interface { + obj := &core.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: cluster.GetNameForResource(mysqlcluster.HealthyReplicasService), + Namespace: cluster.Namespace, + }, + } + + return syncer.NewObjectSyncer("HealthyReplicasSVC", cluster.Unwrap(), obj, c, scheme, func(in runtime.Object) error { + out := in.(*core.Service) + + // set service labels + out.Labels = cluster.GetLabels() + out.Labels["mysql.presslabs.org/service-type"] = "ready-replicas" + + // set selectors for healthy replica (non-master) mysql pods only + out.Spec.Selector = cluster.GetSelectorLabels() + out.Spec.Selector["role"] = "replica" + out.Spec.Selector["healthy"] = "yes" + + if len(out.Spec.Ports) != 2 { + out.Spec.Ports = make([]core.ServicePort, 2) + } + + out.Spec.Ports[0].Name = MysqlPortName + out.Spec.Ports[0].Port = MysqlPort + out.Spec.Ports[0].TargetPort = TargetPort + out.Spec.Ports[0].Protocol = core.ProtocolTCP + + out.Spec.Ports[1].Name = SidecarServerPortName + out.Spec.Ports[1].Port = SidecarServerPort + out.Spec.Ports[1].Protocol = core.ProtocolTCP + + return nil + }) + +} diff --git a/pkg/controller/mysqlcluster/internal/syncer/master_service.go b/pkg/controller/mysqlcluster/internal/syncer/master_service.go index 298f1f91c..41cc7975a 100644 --- a/pkg/controller/mysqlcluster/internal/syncer/master_service.go +++ b/pkg/controller/mysqlcluster/internal/syncer/master_service.go @@ -46,14 +46,18 @@ func NewMasterSVCSyncer(c client.Client, scheme *runtime.Scheme, cluster *mysqlc out.Spec.Selector = cluster.GetSelectorLabels() out.Spec.Selector["role"] = "master" - if len(out.Spec.Ports) != 1 { - out.Spec.Ports = make([]core.ServicePort, 1) + if len(out.Spec.Ports) != 2 { + out.Spec.Ports = make([]core.ServicePort, 2) } out.Spec.Ports[0].Name = MysqlPortName out.Spec.Ports[0].Port = MysqlPort out.Spec.Ports[0].TargetPort = TargetPort out.Spec.Ports[0].Protocol = core.ProtocolTCP + out.Spec.Ports[1].Name = SidecarServerPortName + out.Spec.Ports[1].Port = SidecarServerPort + out.Spec.Ports[1].Protocol = core.ProtocolTCP + return nil }) diff --git a/pkg/controller/mysqlcluster/mysqlcluster_controller.go b/pkg/controller/mysqlcluster/mysqlcluster_controller.go index dd426ea18..57588c0ba 100644 --- a/pkg/controller/mysqlcluster/mysqlcluster_controller.go +++ b/pkg/controller/mysqlcluster/mysqlcluster_controller.go @@ -214,6 +214,7 @@ func (r *ReconcileMysqlCluster) Reconcile(request reconcile.Request) (reconcile. clustersyncer.NewHeadlessSVCSyncer(r.Client, r.scheme, cluster), clustersyncer.NewMasterSVCSyncer(r.Client, r.scheme, cluster), clustersyncer.NewHealthySVCSyncer(r.Client, r.scheme, cluster), + clustersyncer.NewHealthyReplicasSVCSyncer(r.Client, r.scheme, cluster), clustersyncer.NewStatefulSetSyncer(r.Client, r.scheme, cluster, cmRev, sctRev, r.opt), } diff --git a/pkg/internal/mysqlcluster/mysqlcluster.go b/pkg/internal/mysqlcluster/mysqlcluster.go index cd538fe96..dafc310c3 100644 --- a/pkg/internal/mysqlcluster/mysqlcluster.go +++ b/pkg/internal/mysqlcluster/mysqlcluster.go @@ -111,7 +111,9 @@ const ( ConfigMap ResourceName = "config-files" // MasterService is the name of the service that points to master node MasterService ResourceName = "master-service" - // HealthyNodesService is the name of a service that continas all healthy nodes + // HealthyReplicasService is the name of a service that points healthy replicas (excludes master) + HealthyReplicasService ResourceName = "healthy-replicas-service" + // HealthyNodesService is the name of a service that contains all healthy nodes HealthyNodesService ResourceName = "healthy-nodes-service" // PodDisruptionBudget is the name of pod disruption budget for the stateful set PodDisruptionBudget ResourceName = "pdb" @@ -131,6 +133,8 @@ func GetNameForResource(name ResourceName, clusterName string) string { return fmt.Sprintf("%s-mysql", clusterName) case MasterService: return fmt.Sprintf("%s-mysql-master", clusterName) + case HealthyReplicasService: + return fmt.Sprintf("%s-mysql-replicas", clusterName) case HeadlessSVC: return HeadlessSVCName case OldHeadlessSVC: diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index 9b1376dfb..70331c7f0 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -18,12 +18,40 @@ package sidecar import ( "fmt" + "io/ioutil" + "net/http" "os" "os/exec" + "path" "strings" ) -// RunCloneCommand clone the data from source. +// RunCloneCommand clones the data from several potential sources. +// +// There are a few possible scenarios that this function tries to handle: +// +// Scenario | Action Taken +// ------------------------------------------------------------------------------------ +// Data already exists | Log an informational message and return without error. +// | This permits the pod to continue initializing and mysql +// | will use the data already on the PVC. +// ------------------------------------------------------------------------------------ +// Healthy replicas exist | We will attempt to clone from the healthy replicas. +// | If the cloning starts but is interrupted, we will return +// | with an error, not trying to clone from the master. The +// | assumption is that some intermittent error caused the +// | failure and we should let K8S restart the init container +// | to try to clone from the replicas again. +// ------------------------------------------------------------------------------------ +// No healthy replicas; only | We attempt to clone from the master, assuming that this +// master exists | is the initialization of the second pod in a multi-pod +// | cluster. If cloning starts and is interrupted, we will +// | return with an error, letting K8S try again. +// ------------------------------------------------------------------------------------ +// No healthy replicas; no | If there is a bucket URL to clone from, we will try that. +// master; bucket URL exists | The assumption is that this is the bootstrap case: the +// | very first mysql pod is being initialized. +// ------------------------------------------------------------------------------------ func RunCloneCommand(cfg *Config) error { log.Info("cloning command", "host", cfg.Hostname) @@ -36,32 +64,65 @@ func RunCloneCommand(cfg *Config) error { return fmt.Errorf("removing lost+found: %s", err) } - if cfg.ServerID() > cfg.MyServerIDOffset { - // cloning from prior node - sourceHost := cfg.FQDNForServer(cfg.ServerID() - 1) - err := cloneFromSource(cfg, sourceHost) - if err != nil { - return fmt.Errorf("failed to clone from %s, err: %s", sourceHost, err) + if isServiceAvailable(cfg.ReplicasFQDN()) { + if err := attemptClone(cfg, cfg.ReplicasFQDN()); err != nil { + return fmt.Errorf("cloning from healthy replicas failed due to unexpected error: %s", err) + } + } else if isServiceAvailable(cfg.MasterFQDN()) { + log.Info("healthy replica service was unavailable for cloning, will attempt to clone from the master") + if err := attemptClone(cfg, cfg.MasterFQDN()); err != nil { + return fmt.Errorf("cloning from master service failed due to unexpected error: %s", err) } } else if cfg.ShouldCloneFromBucket() { // cloning from provided initBucketURL - err := cloneFromBucket(cfg.InitBucketURL) - if err != nil { + log.Info("cloning from bucket") + if err := cloneFromBucket(cfg.InitBucketURL); err != nil { return fmt.Errorf("failed to clone from bucket, err: %s", err) } } else { - log.Info("nothing to clone or init from") + log.Info("nothing to clone from: no existing data found, no replicas and no master available, and no clone bucket url found") return nil } // prepare backup - if err := xtrabackupPreperData(); err != nil { + if err := xtrabackupPrepareData(); err != nil { return err } return nil } +func isServiceAvailable(svc string) bool { + req, err := http.NewRequest("GET", prepareUrl(svc, serverProbeEndpoint), nil) + if err != nil { + log.Info("failed to check available service", "service", svc, "error", err) + return false + } + + client := &http.Client{} + client.Transport = fastTimeoutTransport(5) + resp, err := client.Do(req) + if err != nil { + log.Info("service was not available", "service", svc, "error", err) + return false + } + + if resp.StatusCode != 200 { + log.Info("service not available", "service", svc, "HTTP status code", resp.StatusCode) + return false + } + + return true +} + +func attemptClone(cfg *Config, sourceService string) error { + err := cloneFromSource(cfg, sourceService) + if err != nil { + return fmt.Errorf("failed to clone from %s, err: %s", sourceService, err) + } + return nil +} + func cloneFromBucket(initBucket string) error { initBucket = strings.Replace(initBucket, "://", ":", 1) @@ -84,7 +145,7 @@ func cloneFromBucket(initBucket string) error { // extracts files from stdin (-x) and writes them to mysql // data target dir // nolint: gosec - xbstream := exec.Command("xbstream", "-x", "-C", dataDir) + xbstream := exec.Command(xbStreamCommand, "-x", "-C", dataDir) var err error // rclone | gzip | xbstream @@ -140,11 +201,19 @@ func cloneFromSource(cfg *Config, host string) error { // extracts files from stdin (-x) and writes them to mysql // data target dir // nolint: gosec - xbstream := exec.Command("xbstream", "-x", "-C", dataDir) + xbstream := exec.Command(xbStreamCommand, "-x", "-C", dataDir) xbstream.Stdin = response.Body xbstream.Stderr = os.Stderr + cloneSucceeded := false + defer func() { + if !cloneSucceeded { + log.Info("clone operation failed, cleaning up dataDir so retries may proceed") + cleanDataDir() + } + }() + if err := xbstream.Start(); err != nil { return fmt.Errorf("xbstream start error: %s", err) } @@ -157,12 +226,13 @@ func cloneFromSource(cfg *Config, host string) error { return err } + cloneSucceeded = true return nil } -func xtrabackupPreperData() error { +func xtrabackupPrepareData() error { // nolint: gosec - xtbkCmd := exec.Command("xtrabackup", "--prepare", + xtbkCmd := exec.Command(xtrabackupCommand, "--prepare", fmt.Sprintf("--target-dir=%s", dataDir)) xtbkCmd.Stderr = os.Stderr @@ -171,6 +241,20 @@ func xtrabackupPreperData() error { } func deleteLostFound() error { - path := fmt.Sprintf("%s/lost+found", dataDir) - return os.RemoveAll(path) + lfPath := fmt.Sprintf("%s/lost+found", dataDir) + return os.RemoveAll(lfPath) +} + +func cleanDataDir() { + files, err := ioutil.ReadDir(dataDir) + if err != nil { + log.Error(err, "failed to clean dataDir") + } + + for _, f := range files { + toRemove := path.Join(dataDir, f.Name()) + if err := os.RemoveAll(toRemove); err != nil { + log.Error(err, "failed to remove file in dataDir") + } + } } diff --git a/pkg/sidecar/appclone_fakeserver_test.go b/pkg/sidecar/appclone_fakeserver_test.go new file mode 100644 index 000000000..a398e901b --- /dev/null +++ b/pkg/sidecar/appclone_fakeserver_test.go @@ -0,0 +1,135 @@ +package sidecar + +import ( + "context" + "fmt" + "net/http" + "time" +) + +type loggedRequest struct { + endpoint string + timestamp time.Time +} + +type fakeServer struct { + cfg *Config + server http.Server + calls []loggedRequest + simulateTruncate bool // Will cause the next request to truncate the response + simulateError bool // Will cause the next request to return http error + validXBStream []byte +} + +func newFakeServer(address string, cfg *Config) *fakeServer { + mux := http.NewServeMux() + fSrv := &fakeServer{ + cfg: cfg, + server: http.Server{ + Addr: address, + Handler: mux, + }, + } + + // A small file named "t" containing the text "fake-backup", encoded with xbstream -c + fSrv.validXBStream = []byte{ + 0x58, 0x42, 0x53, 0x54, 0x43, 0x4b, 0x30, 0x31, 0x00, 0x50, 0x01, 0x00, 0x00, 0x00, 0x74, 0x0c, // XBSTCK01.P....t. + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x6b, // ...............k + 0xcc, 0x84, 0x00, 0x66, 0x61, 0x6b, 0x65, 0x2d, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x0a, 0x58, // ...fake-backup.X + 0x42, 0x53, 0x54, 0x43, 0x4b, 0x30, 0x31, 0x00, 0x45, 0x01, 0x00, 0x00, 0x00, 0x74, // BSTCK01.E....t. + } + + fSrv.reset() + + mux.Handle(serverProbeEndpoint, http.HandlerFunc(fSrv.healthHandler)) + mux.Handle(serverBackupEndpoint, http.HandlerFunc(fSrv.backupHandler)) + + return fSrv +} + +// Since we are starting/stopping these fake servers for individual test cases, we should wait +// for them to startup so as to avoid false positives in our tests. +func (fSrv *fakeServer) waitReady() error { + retries := 0 + for { + resp, err := http.Get(prepareUrl(fSrv.server.Addr, serverProbeEndpoint)) + if err == nil && resp.StatusCode == 200 { + return nil + } + if retries++; retries > 5 { + return fmt.Errorf("could not start fake sidecar server: %s", err) + } + time.Sleep(50 * time.Millisecond) + } +} + +func (fSrv *fakeServer) start() error { + go func() { + err := fSrv.server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + panic("couldn't start fakeserver") + } + }() + return fSrv.waitReady() +} + +func (fSrv *fakeServer) stop() error { + if err := fSrv.server.Shutdown(context.Background()); err != nil { + return fmt.Errorf("failed to stop appclone test server: %s", err) + } + return nil +} + +func (fSrv *fakeServer) reset() { + fSrv.calls = make([]loggedRequest, 0) + fSrv.simulateError = false + fSrv.simulateTruncate = false +} + +func (fSrv *fakeServer) backupRequestsReceived() int { + return fSrv.callsForEndpoint(serverBackupEndpoint) +} + +func (fSrv *fakeServer) callsForEndpoint(endpoint string) int { + count := 0 + for _, call := range fSrv.calls { + if call.endpoint == endpoint { + count++ + } + } + return count +} + +func (fSrv *fakeServer) healthHandler(w http.ResponseWriter, req *http.Request) { + fSrv.calls = append(fSrv.calls, loggedRequest{req.RequestURI, time.Now()}) + w.WriteHeader(http.StatusOK) + if _, err := w.Write([]byte("OK")); err != nil { + log.Error(err, "failed writing request") + } +} + +func (fSrv *fakeServer) backupHandler(w http.ResponseWriter, req *http.Request) { + fSrv.calls = append(fSrv.calls, loggedRequest{req.RequestURI, time.Now()}) + + // Error: return http status code of 500 + if fSrv.simulateError { + http.Error(w, "xtrbackup failed", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Trailer", backupStatusTrailer) + + backup := fSrv.validXBStream + // Truncate: send half the stream, with "successful" trailers + if fSrv.simulateTruncate { + backup = fSrv.validXBStream[0:10] + } + + if _, err := w.Write(backup); err != nil { + log.Error(err, "failed writing request") + } + + w.Header().Set(backupStatusTrailer, backupSuccessful) +} diff --git a/pkg/sidecar/appclone_test.go b/pkg/sidecar/appclone_test.go new file mode 100644 index 000000000..3e796459f --- /dev/null +++ b/pkg/sidecar/appclone_test.go @@ -0,0 +1,226 @@ +package sidecar + +import ( + "fmt" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "io/ioutil" + "os" + "path" +) + +var _ = Describe("Test RunCloneCommand cloning logic", func() { + + var ( + cfg *Config + fakeBackupFile string // as named in fakeServer.validXBStream + fakeMasterServer *fakeServer + fakeReplicaServer *fakeServer + // Normally, these are true k8s services, each listening on + // SidecarServerPort. Since we can't simulate that in unit tests, we put + // each "service" on its own port. + masterServiceAddr = fmt.Sprintf(":%d", serverPort) + healthyReplicasServiceAddr = ":8081" + ) + + setupFakeDataDir := func() { + tempDataDir, err := ioutil.TempDir("", "mysql-operator-tests") + Expect(err).ToNot(HaveOccurred()) + dataDir = tempDataDir + fakeBackupFile = path.Join(dataDir, "t") + } + + teardownFakeDataDir := func() { + err := os.RemoveAll(dataDir) + Expect(err).ToNot(HaveOccurred()) + } + + startFakeServer := func(address string) *fakeServer { + fakeSrv := newFakeServer(address, cfg) + err := fakeSrv.start() + Expect(err).NotTo(HaveOccurred()) + return fakeSrv + } + + startFakeMasterService := func() { + fakeMasterServer = startFakeServer(masterServiceAddr) + } + + startFakeReplicaService := func() { + fakeReplicaServer = startFakeServer(healthyReplicasServiceAddr) + } + + stopFakeMasterService := func() { + if fakeMasterServer != nil { + err := fakeMasterServer.stop() + Expect(err).ToNot(HaveOccurred()) + } + } + + stopFakeReplicaService := func() { + if fakeReplicaServer != nil { + err := fakeReplicaServer.stop() + Expect(err).ToNot(HaveOccurred()) + } + } + + // Don't let xtrabackup try to --prepare our little fake xbstream sample or + // it will return errors. + disableXtraBackup := func() { + xtrabackupCommand = "echo" + } + + BeforeSuite(func() { + cfg = &Config{ + masterService: "localhost" + masterServiceAddr, + healthyReplicaCloneService: "localhost" + healthyReplicasServiceAddr, + } + + setupFakeDataDir() + disableXtraBackup() + }) + + AfterSuite(func() { + teardownFakeDataDir() + }) + + BeforeEach(func() { + err := os.RemoveAll(fakeBackupFile) + Expect(err).ToNot(HaveOccurred()) + cfg.ExistsMySQLData = false + startFakeReplicaService() + startFakeMasterService() + }) + + AfterEach(func() { + stopFakeMasterService() + stopFakeReplicaService() + }) + + It("should not try to clone when data already exists", func() { + cfg.ExistsMySQLData = true + + err := RunCloneCommand(cfg) + Expect(err).ToNot(HaveOccurred()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(0)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + + It("should request a backup and succeed ", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := RunCloneCommand(cfg) + Expect(err).To(Succeed()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) + + Expect(fakeBackupFile).Should(BeAnExistingFile()) + }) + + Context("with truncated xbstream data from replicas", func() { + + BeforeEach(func() { + fakeReplicaServer.simulateTruncate = true + }) + + It("cloneFromSource should clean up the data directory after failure", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := cloneFromSource(cfg, healthyReplicasServiceAddr) + Expect(err).To(HaveOccurred()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + + It("should not fall back to master service", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := RunCloneCommand(cfg) + Expect(err).To(HaveOccurred()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + + }) + + Context("with http error during backup", func() { + + BeforeEach(func() { + fakeReplicaServer.simulateError = true + }) + + It("cloneFromSource should clean up the data directory after failure", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := cloneFromSource(cfg, healthyReplicasServiceAddr) + Expect(err).To(HaveOccurred()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + + It("should not fall back to master service", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := RunCloneCommand(cfg) + Expect(err).To(HaveOccurred()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + + }) + + Context("with no healthy replicas service", func() { + + BeforeEach(func() { + stopFakeReplicaService() + }) + + It("should fall back to master service and succeed", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := RunCloneCommand(cfg) + Expect(err).ToNot(HaveOccurred()) + + Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(0)) + Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(1)) + + Expect(fakeBackupFile).Should(BeAnExistingFile()) + }) + + }) + + Context("with no healthy replicas or master service", func() { + + BeforeEach(func() { + stopFakeReplicaService() + stopFakeMasterService() + }) + + It("should return nil", func() { + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := RunCloneCommand(cfg) + Expect(err).To(Succeed()) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + + }) + +}) diff --git a/pkg/sidecar/configs.go b/pkg/sidecar/configs.go index 5b23cb72e..9e5617437 100644 --- a/pkg/sidecar/configs.go +++ b/pkg/sidecar/configs.go @@ -75,6 +75,9 @@ type Config struct { // Offset for assigning MySQL Server ID MyServerIDOffset int + + masterService string + healthyReplicaCloneService string } // FQDNForServer returns the pod hostname for given MySQL server id @@ -90,9 +93,20 @@ func (cfg *Config) ClusterFQDN() string { // MasterFQDN the FQ Name of the cluster's master func (cfg *Config) MasterFQDN() string { + if cfg.masterService != "" { + return cfg.masterService + } return mysqlcluster.GetNameForResource(mysqlcluster.MasterService, cfg.ClusterName) } +// ReplicasFQDN the FQ Name of the replicas service +func (cfg *Config) ReplicasFQDN() string { + if cfg.healthyReplicaCloneService != "" { + return cfg.healthyReplicaCloneService + } + return mysqlcluster.GetNameForResource(mysqlcluster.HealthyReplicasService, cfg.ClusterName) +} + // ServerID returns the MySQL server id func (cfg *Config) ServerID() int { ordinal := getOrdinalFromHostname(cfg.Hostname) diff --git a/pkg/sidecar/constants.go b/pkg/sidecar/constants.go index c0541480c..f5c964876 100644 --- a/pkg/sidecar/constants.go +++ b/pkg/sidecar/constants.go @@ -65,6 +65,11 @@ var ( serverProbeEndpoint = constants.SidecarServerProbePath // ServerBackupEndpoint is the http server endpoint for backups serverBackupEndpoint = "/xbackup" + + // xtrabackup Executable Name + xtrabackupCommand = "xtrabackup" + // xbstream Executable Name + xbStreamCommand = "xbstream" ) const ( diff --git a/pkg/sidecar/server.go b/pkg/sidecar/server.go index 9afc98ddd..783f78c29 100644 --- a/pkg/sidecar/server.go +++ b/pkg/sidecar/server.go @@ -19,12 +19,14 @@ package sidecar import ( "context" "fmt" + "github.com/presslabs/mysql-operator/pkg/util/constants" "io" + "net" "net/http" "os" "os/exec" - - "github.com/presslabs/mysql-operator/pkg/util/constants" + "strings" + "time" ) const ( @@ -90,7 +92,7 @@ func (s *server) backupHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Trailer", backupStatusTrailer) // nolint: gosec - xtrabackup := exec.Command("xtrabackup", "--backup", "--slave-info", "--stream=xbstream", + xtrabackup := exec.Command(xtrabackupCommand, "--backup", "--slave-info", "--stream=xbstream", fmt.Sprintf("--tables-exclude=%s.%s", constants.OperatorDbName, constants.OperatorStatusTableName), "--host=127.0.0.1", fmt.Sprintf("--user=%s", s.cfg.ReplicationUser), fmt.Sprintf("--password=%s", s.cfg.ReplicationPassword), @@ -151,13 +153,33 @@ func maxClients(h http.Handler, n int) http.Handler { }) } +func prepareUrl(svc string, endpoint string) string { + if !strings.Contains(svc, ":") { + svc = fmt.Sprintf("%s:%d", svc, serverPort) + } + return fmt.Sprintf("http://%s%s", svc, endpoint) +} + +func fastTimeoutTransport(dialTimeout int) http.RoundTripper { + return &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: time.Duration(dialTimeout) * time.Second, + KeepAlive: 30 * time.Second, + DualStack: true, + }).DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } +} + // requestABackup connects to specified host and endpoint and gets the backup func requestABackup(cfg *Config, host, endpoint string) (*http.Response, error) { log.Info("initialize a backup", "host", host, "endpoint", endpoint) - req, err := http.NewRequest("GET", fmt.Sprintf( - "http://%s:%d%s", host, serverPort, endpoint), nil) - + req, err := http.NewRequest("GET", prepareUrl(host, endpoint), nil) if err != nil { return nil, fmt.Errorf("fail to create request: %s", err) } @@ -166,6 +188,7 @@ func requestABackup(cfg *Config, host, endpoint string) (*http.Response, error) req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword) client := &http.Client{} + client.Transport = fastTimeoutTransport(5) resp, err := client.Do(req) if err != nil || resp.StatusCode != 200 { diff --git a/test/e2e/cluster/cluster.go b/test/e2e/cluster/cluster.go index 341b80aef..08693c081 100644 --- a/test/e2e/cluster/cluster.go +++ b/test/e2e/cluster/cluster.go @@ -113,7 +113,7 @@ var _ = Describe("Mysql cluster tests", func() { err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(podName, &meta.DeleteOptions{}) Expect(err).NotTo(HaveOccurred(), "Failed to delete pod %s", podName) - // check failover done, this is a reggression test + // check failover done, this is a regression test // TODO: decrease this timeout to 20 failoverTimeout := 60 * time.Second By(fmt.Sprintf("Check failover done; timeout=%s", failoverTimeout)) @@ -128,6 +128,44 @@ var _ = Describe("Mysql cluster tests", func() { testClusterEndpoints(f, cluster, []int{1}, []int{0, 1}) }) + It("fails over and successfully reclones master from replica", func() { + cluster.Spec.Replicas = &two + Expect(f.Client.Update(context.TODO(), cluster)).To(Succeed()) + + // test cluster to be ready + By("test cluster is ready after cluster update") + testClusterReadiness(f, cluster) + By("test cluster is registered in orchestrator after cluster update") + testClusterIsRegistredWithOrchestrator(f, cluster) + + // check cluster to have a master and a slave + By("test cluster nodes master condition is properly set") + f.NodeEventuallyCondition(cluster, f.GetPodHostname(cluster, 0), api.NodeConditionMaster, core.ConditionTrue, f.Timeout) + f.NodeEventuallyCondition(cluster, f.GetPodHostname(cluster, 1), api.NodeConditionMaster, core.ConditionFalse, f.Timeout) + + podName := framework.GetNameForResource("sts", cluster) + "-0" + + // delete PVC from master pod and wait for it to be removed + pvcName := "data-" + podName + deletePVCSynchronously(f, pvcName, cluster.Namespace, 15 * time.Second) + + // now delete master pod + err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(podName, &meta.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred(), "Failed to delete pod %s", podName) + + failoverTimeout := 60 * time.Second + By(fmt.Sprintf("Check failover done; timeout=%s", failoverTimeout)) + f.NodeEventuallyCondition(cluster, f.GetPodHostname(cluster, 1), api.NodeConditionMaster, core.ConditionTrue, failoverTimeout) + + // after some time node 0 should be up and should be slave + By("test cluster master condition is properly set") + f.NodeEventuallyCondition(cluster, f.GetPodHostname(cluster, 0), api.NodeConditionMaster, core.ConditionFalse, f.Timeout) + f.NodeEventuallyCondition(cluster, f.GetPodHostname(cluster, 0), api.NodeConditionReplicating, core.ConditionTrue, f.Timeout) + + By("test cluster endpoints after failover") + testClusterEndpoints(f, cluster, []int{1}, []int{0, 1}) + }) + It("scale down a cluster", func() { // configure MySQL cluster to have 2 replicas and to use PV for data storage cluster.Spec.Replicas = &two @@ -255,6 +293,19 @@ var _ = Describe("Mysql cluster tests", func() { }) +func deletePVCSynchronously(f *framework.Framework, pvcName, namespace string, timeout time.Duration) { + pvc := &core.PersistentVolumeClaim{} + pvcKey := types.NamespacedName{Name: pvcName, Namespace: namespace} + Expect(f.Client.Get(context.TODO(), pvcKey, pvc)).To(Succeed(), "failed to get pvc %s", pvcName) + pvc.Finalizers = nil + Expect(f.Client.Update(context.TODO(), pvc)).To(Succeed(), "Failed to remove finalizers from pvc %s", pvcName) + Expect(f.Client.Delete(context.TODO(), pvc)).To(Succeed(), "Failed to delete pvc %s", pvcName) + pvcRemoved := fmt.Sprintf("persistentvolumeclaims \"%s\" not found", pvc.Name) + Eventually(func() error { + return f.Client.Get(context.TODO(), pvcKey, pvc) + }, timeout, POLLING).Should(MatchError(pvcRemoved), "PVC did not delete in time '%s'", pvc.Name) +} + func testClusterReadiness(f *framework.Framework, cluster *api.MysqlCluster) { timeout := f.Timeout if *cluster.Spec.Replicas > 0 { From 3d8833ae81930bf128650d1fa60f530ae0de4680 Mon Sep 17 00:00:00 2001 From: dougfales Date: Wed, 6 Nov 2019 11:43:23 -0700 Subject: [PATCH 02/13] Linter: Url -> URL. --- pkg/sidecar/appclone.go | 2 +- pkg/sidecar/appclone_fakeserver_test.go | 2 +- pkg/sidecar/server.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index 70331c7f0..963d3e7cb 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -93,7 +93,7 @@ func RunCloneCommand(cfg *Config) error { } func isServiceAvailable(svc string) bool { - req, err := http.NewRequest("GET", prepareUrl(svc, serverProbeEndpoint), nil) + req, err := http.NewRequest("GET", prepareURL(svc, serverProbeEndpoint), nil) if err != nil { log.Info("failed to check available service", "service", svc, "error", err) return false diff --git a/pkg/sidecar/appclone_fakeserver_test.go b/pkg/sidecar/appclone_fakeserver_test.go index a398e901b..5880c20f5 100644 --- a/pkg/sidecar/appclone_fakeserver_test.go +++ b/pkg/sidecar/appclone_fakeserver_test.go @@ -52,7 +52,7 @@ func newFakeServer(address string, cfg *Config) *fakeServer { func (fSrv *fakeServer) waitReady() error { retries := 0 for { - resp, err := http.Get(prepareUrl(fSrv.server.Addr, serverProbeEndpoint)) + resp, err := http.Get(prepareURL(fSrv.server.Addr, serverProbeEndpoint)) if err == nil && resp.StatusCode == 200 { return nil } diff --git a/pkg/sidecar/server.go b/pkg/sidecar/server.go index 783f78c29..e3fc03a8a 100644 --- a/pkg/sidecar/server.go +++ b/pkg/sidecar/server.go @@ -153,7 +153,7 @@ func maxClients(h http.Handler, n int) http.Handler { }) } -func prepareUrl(svc string, endpoint string) string { +func prepareURL(svc string, endpoint string) string { if !strings.Contains(svc, ":") { svc = fmt.Sprintf("%s:%d", svc, serverPort) } @@ -179,7 +179,7 @@ func fastTimeoutTransport(dialTimeout int) http.RoundTripper { func requestABackup(cfg *Config, host, endpoint string) (*http.Response, error) { log.Info("initialize a backup", "host", host, "endpoint", endpoint) - req, err := http.NewRequest("GET", prepareUrl(host, endpoint), nil) + req, err := http.NewRequest("GET", prepareURL(host, endpoint), nil) if err != nil { return nil, fmt.Errorf("fail to create request: %s", err) } From 8c5a01f1f31b43a60ec1336652f443a9b77e861e Mon Sep 17 00:00:00 2001 From: dougfales Date: Wed, 6 Nov 2019 12:01:04 -0700 Subject: [PATCH 03/13] License headers on new files. --- pkg/sidecar/appclone_fakeserver_test.go | 16 ++++++++++++++++ pkg/sidecar/appclone_test.go | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pkg/sidecar/appclone_fakeserver_test.go b/pkg/sidecar/appclone_fakeserver_test.go index 5880c20f5..fbccfe9ff 100644 --- a/pkg/sidecar/appclone_fakeserver_test.go +++ b/pkg/sidecar/appclone_fakeserver_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2019 Harvest + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sidecar import ( diff --git a/pkg/sidecar/appclone_test.go b/pkg/sidecar/appclone_test.go index 3e796459f..ec0168fce 100644 --- a/pkg/sidecar/appclone_test.go +++ b/pkg/sidecar/appclone_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2019 Harvest + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package sidecar import ( From ba6d7bfca35d76ce693ea62a5ec3b5d2dc34c145 Mon Sep 17 00:00:00 2001 From: dougfales Date: Wed, 6 Nov 2019 13:47:59 -0700 Subject: [PATCH 04/13] Skip backup data truncation tests when xbstream is not available. --- pkg/sidecar/appclone_test.go | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/pkg/sidecar/appclone_test.go b/pkg/sidecar/appclone_test.go index ec0168fce..390d773f2 100644 --- a/pkg/sidecar/appclone_test.go +++ b/pkg/sidecar/appclone_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/gomega" "io/ioutil" "os" + "os/exec" "path" ) @@ -37,6 +38,7 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { // each "service" on its own port. masterServiceAddr = fmt.Sprintf(":%d", serverPort) healthyReplicasServiceAddr = ":8081" + skipTruncatedDataTests = false ) setupFakeDataDir := func() { @@ -86,6 +88,19 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { xtrabackupCommand = "echo" } + disableXbStreamIfNotAvailable := func() { + if _, err := exec.LookPath(xbStreamCommand); err != nil { + xbStreamCommand = "echo" + skipTruncatedDataTests = true + } + } + + expectBackupFileToBeCreated := func() { + if !skipTruncatedDataTests { + Expect(fakeBackupFile).Should(BeAnExistingFile()) + } + } + BeforeSuite(func() { cfg = &Config{ masterService: "localhost" + masterServiceAddr, @@ -94,6 +109,7 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { setupFakeDataDir() disableXtraBackup() + disableXbStreamIfNotAvailable() }) AfterSuite(func() { @@ -134,7 +150,8 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) - Expect(fakeBackupFile).Should(BeAnExistingFile()) + expectBackupFileToBeCreated() + }) Context("with truncated xbstream data from replicas", func() { @@ -144,6 +161,11 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { }) It("cloneFromSource should clean up the data directory after failure", func() { + + if skipTruncatedDataTests { + Skip("Skipping tests for truncated backup stream because no xbstream executable was found.") + } + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) err := cloneFromSource(cfg, healthyReplicasServiceAddr) @@ -156,6 +178,11 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { }) It("should not fall back to master service", func() { + + if skipTruncatedDataTests { + Skip("Skipping tests for truncated backup stream because no xbstream executable was found.") + } + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) err := RunCloneCommand(cfg) @@ -164,7 +191,7 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) - Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + expectBackupFileToBeCreated() }) }) @@ -216,7 +243,7 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(0)) Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(1)) - Expect(fakeBackupFile).Should(BeAnExistingFile()) + expectBackupFileToBeCreated() }) }) From b69f5e7aa8b8d60b70479f5b88165287d9eb8594 Mon Sep 17 00:00:00 2001 From: dougfales Date: Wed, 6 Nov 2019 14:10:52 -0700 Subject: [PATCH 05/13] xtrbackup -> xtrabackup. --- pkg/sidecar/appclone_fakeserver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sidecar/appclone_fakeserver_test.go b/pkg/sidecar/appclone_fakeserver_test.go index fbccfe9ff..fd0793cbf 100644 --- a/pkg/sidecar/appclone_fakeserver_test.go +++ b/pkg/sidecar/appclone_fakeserver_test.go @@ -129,7 +129,7 @@ func (fSrv *fakeServer) backupHandler(w http.ResponseWriter, req *http.Request) // Error: return http status code of 500 if fSrv.simulateError { - http.Error(w, "xtrbackup failed", http.StatusInternalServerError) + http.Error(w, "xtrabackup failed", http.StatusInternalServerError) return } From 97e5ab7b0d6c2765da62abb97b4e9b78dfcdbc56 Mon Sep 17 00:00:00 2001 From: dougfales Date: Fri, 8 Nov 2019 06:02:55 -0700 Subject: [PATCH 06/13] Broken test assertion. --- pkg/sidecar/appclone_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sidecar/appclone_test.go b/pkg/sidecar/appclone_test.go index 390d773f2..1df96b00b 100644 --- a/pkg/sidecar/appclone_test.go +++ b/pkg/sidecar/appclone_test.go @@ -191,7 +191,7 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { Expect(fakeReplicaServer.backupRequestsReceived()).To(Equal(1)) Expect(fakeMasterServer.backupRequestsReceived()).To(Equal(0)) - expectBackupFileToBeCreated() + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) }) }) From 6c0031785cdeaaac3ba74f468e6bfc7a6941e1d2 Mon Sep 17 00:00:00 2001 From: dougfales Date: Fri, 8 Nov 2019 09:53:06 -0700 Subject: [PATCH 07/13] Better name for custom transport with timeout. --- pkg/sidecar/appclone.go | 2 +- pkg/sidecar/constants.go | 3 +++ pkg/sidecar/server.go | 6 +++--- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index 963d3e7cb..1d9e3ac9e 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -100,7 +100,7 @@ func isServiceAvailable(svc string) bool { } client := &http.Client{} - client.Transport = fastTimeoutTransport(5) + client.Transport = transportWithTimeout(serverConnectTimeout) resp, err := client.Do(req) if err != nil { log.Info("service was not available", "service", svc, "error", err) diff --git a/pkg/sidecar/constants.go b/pkg/sidecar/constants.go index f5c964876..ec3ba1f90 100644 --- a/pkg/sidecar/constants.go +++ b/pkg/sidecar/constants.go @@ -18,6 +18,7 @@ package sidecar import ( "strconv" + "time" // add mysql driver _ "github.com/go-sql-driver/mysql" @@ -65,6 +66,8 @@ var ( serverProbeEndpoint = constants.SidecarServerProbePath // ServerBackupEndpoint is the http server endpoint for backups serverBackupEndpoint = "/xbackup" + // ServerDialTimeout is the connect timeout (not http timeout) for requesting a backup from the sidecar server + serverConnectTimeout = 5 * time.Second // xtrabackup Executable Name xtrabackupCommand = "xtrabackup" diff --git a/pkg/sidecar/server.go b/pkg/sidecar/server.go index e3fc03a8a..0e36394b5 100644 --- a/pkg/sidecar/server.go +++ b/pkg/sidecar/server.go @@ -160,11 +160,11 @@ func prepareURL(svc string, endpoint string) string { return fmt.Sprintf("http://%s%s", svc, endpoint) } -func fastTimeoutTransport(dialTimeout int) http.RoundTripper { +func transportWithTimeout(connectTimeout time.Duration) http.RoundTripper { return &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ - Timeout: time.Duration(dialTimeout) * time.Second, + Timeout: connectTimeout, KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, @@ -188,7 +188,7 @@ func requestABackup(cfg *Config, host, endpoint string) (*http.Response, error) req.SetBasicAuth(cfg.BackupUser, cfg.BackupPassword) client := &http.Client{} - client.Transport = fastTimeoutTransport(5) + client.Transport = transportWithTimeout(serverConnectTimeout) resp, err := client.Do(req) if err != nil || resp.StatusCode != 200 { From 5690efd2fa3a89a61a68b93c240dd29f31b98ce0 Mon Sep 17 00:00:00 2001 From: dougfales Date: Fri, 8 Nov 2019 10:38:27 -0700 Subject: [PATCH 08/13] Adding a specific case for cluster initialization. When no services are available, we want to die and let k8s re-initiialize the pod, *unless* this is the initial pod, in which case it is valid to allow initialization to coninue without cloning from a service or bucket. --- pkg/sidecar/appclone.go | 10 ++++++++-- pkg/sidecar/appclone_test.go | 13 ++++++++++++- pkg/sidecar/configs.go | 5 +++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index 1d9e3ac9e..e0d934985 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -52,6 +52,10 @@ import ( // master; bucket URL exists | The assumption is that this is the bootstrap case: the // | very first mysql pod is being initialized. // ------------------------------------------------------------------------------------ +// No healthy replcias; no | If this is the first pod in the cluster, then allow it +// master; no bucket URL | to initialize as an empty instance, otherwise, return an +// | error to allow k8s to kill and restart the pod. +// ------------------------------------------------------------------------------------ func RunCloneCommand(cfg *Config) error { log.Info("cloning command", "host", cfg.Hostname) @@ -79,9 +83,11 @@ func RunCloneCommand(cfg *Config) error { if err := cloneFromBucket(cfg.InitBucketURL); err != nil { return fmt.Errorf("failed to clone from bucket, err: %s", err) } - } else { - log.Info("nothing to clone from: no existing data found, no replicas and no master available, and no clone bucket url found") + } else if cfg.IsFirstPodInSet() { + log.Info("nothing to clone from: empty cluster initializing") return nil + } else { + return fmt.Errorf("nothing to clone from: no existing data found, no replicas and no master available, and no clone bucket url found") } // prepare backup diff --git a/pkg/sidecar/appclone_test.go b/pkg/sidecar/appclone_test.go index 1df96b00b..e7abfbbea 100644 --- a/pkg/sidecar/appclone_test.go +++ b/pkg/sidecar/appclone_test.go @@ -255,7 +255,8 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { stopFakeMasterService() }) - It("should return nil", func() { + It("should return nil for first pod", func() { + cfg.Hostname = "mysql-mysql-0" Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) err := RunCloneCommand(cfg) @@ -264,6 +265,16 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) }) + It("should return an error for subsequent pods", func() { + cfg.Hostname = "mysql-mysql-1" + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + + err := RunCloneCommand(cfg) + Expect(err).To(HaveOccurred()) + + Expect(fakeBackupFile).ShouldNot(BeAnExistingFile()) + }) + }) }) diff --git a/pkg/sidecar/configs.go b/pkg/sidecar/configs.go index 9e5617437..50dea25f9 100644 --- a/pkg/sidecar/configs.go +++ b/pkg/sidecar/configs.go @@ -120,6 +120,11 @@ func (cfg *Config) MysqlDSN() string { ) } +func (cfg *Config) IsFirstPodInSet() bool { + ordinal := getOrdinalFromHostname(cfg.Hostname) + return ordinal == 0 +} + // ShouldCloneFromBucket returns true if it's time to initialize from a bucket URL provided func (cfg *Config) ShouldCloneFromBucket() bool { return !cfg.ExistsMySQLData && cfg.ServerID() == 100 && len(cfg.InitBucketURL) != 0 From 056c880ca20c56936964eb9facad5bab80707bd4 Mon Sep 17 00:00:00 2001 From: dougfales Date: Mon, 11 Nov 2019 13:58:55 -0700 Subject: [PATCH 09/13] Exit with a different error code in the event of a clone failure. --- cmd/mysql-operator-sidecar/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/mysql-operator-sidecar/main.go b/cmd/mysql-operator-sidecar/main.go index 009add948..ce7b4d0e5 100644 --- a/cmd/mysql-operator-sidecar/main.go +++ b/cmd/mysql-operator-sidecar/main.go @@ -65,7 +65,7 @@ func main() { Run: func(cmd *cobra.Command, args []string) { if err := sidecar.RunCloneCommand(cfg); err != nil { log.Error(err, "clone command failed") - os.Exit(1) + os.Exit(8) } if err := sidecar.RunConfigCommand(cfg); err != nil { log.Error(err, "init command failed") From cab3824fb78c2f46199c904ca71e038ceb543631 Mon Sep 17 00:00:00 2001 From: dougfales Date: Mon, 11 Nov 2019 13:59:10 -0700 Subject: [PATCH 10/13] Comments. --- pkg/sidecar/appclone.go | 2 +- pkg/sidecar/configs.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index e0d934985..c7318e475 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -52,7 +52,7 @@ import ( // master; bucket URL exists | The assumption is that this is the bootstrap case: the // | very first mysql pod is being initialized. // ------------------------------------------------------------------------------------ -// No healthy replcias; no | If this is the first pod in the cluster, then allow it +// No healthy replicas; no | If this is the first pod in the cluster, then allow it // master; no bucket URL | to initialize as an empty instance, otherwise, return an // | error to allow k8s to kill and restart the pod. // ------------------------------------------------------------------------------------ diff --git a/pkg/sidecar/configs.go b/pkg/sidecar/configs.go index 50dea25f9..d5831a398 100644 --- a/pkg/sidecar/configs.go +++ b/pkg/sidecar/configs.go @@ -120,6 +120,7 @@ func (cfg *Config) MysqlDSN() string { ) } +// IsFirstPodInSet returns true if this pod has an ordinal of 0, meaning it is the first one in the set func (cfg *Config) IsFirstPodInSet() bool { ordinal := getOrdinalFromHostname(cfg.Hostname) return ordinal == 0 From 466304627910ef212558f7ee0904619ef45fe4d5 Mon Sep 17 00:00:00 2001 From: dougfales Date: Mon, 11 Nov 2019 14:00:13 -0700 Subject: [PATCH 11/13] More forgiving timeout for this e2e test. --- test/e2e/cluster/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/cluster/cluster.go b/test/e2e/cluster/cluster.go index 08693c081..7e34edbd3 100644 --- a/test/e2e/cluster/cluster.go +++ b/test/e2e/cluster/cluster.go @@ -147,7 +147,7 @@ var _ = Describe("Mysql cluster tests", func() { // delete PVC from master pod and wait for it to be removed pvcName := "data-" + podName - deletePVCSynchronously(f, pvcName, cluster.Namespace, 15 * time.Second) + deletePVCSynchronously(f, pvcName, cluster.Namespace, 30*time.Second) // now delete master pod err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Delete(podName, &meta.DeleteOptions{}) From f9cad01d97ebc7423615eccb2198fe6f7628354d Mon Sep 17 00:00:00 2001 From: dougfales Date: Mon, 11 Nov 2019 14:11:12 -0700 Subject: [PATCH 12/13] Remove unnecessary if. --- pkg/sidecar/appclone.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/sidecar/appclone.go b/pkg/sidecar/appclone.go index c7318e475..e029493f1 100644 --- a/pkg/sidecar/appclone.go +++ b/pkg/sidecar/appclone.go @@ -91,11 +91,7 @@ func RunCloneCommand(cfg *Config) error { } // prepare backup - if err := xtrabackupPrepareData(); err != nil { - return err - } - - return nil + return xtrabackupPrepareData() } func isServiceAvailable(svc string) bool { From e3bb4a0ca323b290c270212d5d314792bc492c77 Mon Sep 17 00:00:00 2001 From: dougfales Date: Thu, 14 Nov 2019 12:35:33 -0700 Subject: [PATCH 13/13] Use addr.Suggest() instead of hardcoded ports for mock test server. --- pkg/sidecar/appclone_test.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/pkg/sidecar/appclone_test.go b/pkg/sidecar/appclone_test.go index e7abfbbea..d0b683352 100644 --- a/pkg/sidecar/appclone_test.go +++ b/pkg/sidecar/appclone_test.go @@ -24,6 +24,7 @@ import ( "os" "os/exec" "path" + "sigs.k8s.io/testing_frameworks/integration/addr" ) var _ = Describe("Test RunCloneCommand cloning logic", func() { @@ -36,11 +37,20 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { // Normally, these are true k8s services, each listening on // SidecarServerPort. Since we can't simulate that in unit tests, we put // each "service" on its own port. - masterServiceAddr = fmt.Sprintf(":%d", serverPort) - healthyReplicasServiceAddr = ":8081" + + masterServiceAddr string + healthyReplicasServiceAddr string skipTruncatedDataTests = false ) + generateAddress := func() string { + port, host, err := addr.Suggest() + if err != nil { + panic("couldn't generate local address for fakeserver") + } + return fmt.Sprintf("%s:%d", host, port) + } + setupFakeDataDir := func() { tempDataDir, err := ioutil.TempDir("", "mysql-operator-tests") Expect(err).ToNot(HaveOccurred()) @@ -102,9 +112,12 @@ var _ = Describe("Test RunCloneCommand cloning logic", func() { } BeforeSuite(func() { + masterServiceAddr = generateAddress() + healthyReplicasServiceAddr = generateAddress() + cfg = &Config{ - masterService: "localhost" + masterServiceAddr, - healthyReplicaCloneService: "localhost" + healthyReplicasServiceAddr, + masterService: masterServiceAddr, + healthyReplicaCloneService: healthyReplicasServiceAddr, } setupFakeDataDir()