From 40558fa4acf7436acbea34b33ec5dd20a2fa5a34 Mon Sep 17 00:00:00 2001 From: Swapnil Mhamane Date: Thu, 15 Nov 2018 09:53:14 +0530 Subject: [PATCH 1/3] Limit the parallalism for chunk upload Signed-off-by: Swapnil Mhamane --- cmd/initializer.go | 8 +- cmd/miscellaneous.go | 2 + cmd/restore.go | 8 +- cmd/server.go | 15 ++- cmd/snapshot.go | 16 ++- cmd/types.go | 8 +- pkg/etcdutil/defrag.go | 4 - pkg/etcdutil/defrag_test.go | 28 +++++ pkg/initializer/initializer.go | 2 +- pkg/initializer/validator/datavalidator.go | 2 +- pkg/snapstore/abs_snapstore.go | 107 +++++++++++-------- pkg/snapstore/gcs_snapstore.go | 102 +++++++++++------- pkg/snapstore/local_snapstore.go | 1 - pkg/snapstore/s3_snapstore.go | 118 +++++++++++---------- pkg/snapstore/snapstore_test.go | 2 +- pkg/snapstore/swift_snapstore.go | 112 +++++++++++-------- pkg/snapstore/types.go | 30 ++++-- pkg/snapstore/utils.go | 43 ++++++-- 18 files changed, 379 insertions(+), 229 deletions(-) diff --git a/cmd/initializer.go b/cmd/initializer.go index 774e542d7..50ac805dc 100644 --- a/cmd/initializer.go +++ b/cmd/initializer.go @@ -61,9 +61,11 @@ func NewInitializeCommand(stopCh <-chan struct{}) *cobra.Command { var snapstoreConfig *snapstore.Config if storageProvider != "" { snapstoreConfig = &snapstore.Config{ - Provider: storageProvider, - Container: storageContainer, - Prefix: path.Join(storagePrefix, backupFormatVersion), + Provider: storageProvider, + Container: storageContainer, + Prefix: path.Join(storagePrefix, backupFormatVersion), + MaxParallelChunkUploads: maxParallelChunkUploads, + TempDir: snapstoreTempDir, } } etcdInitializer := initializer.NewInitializer(options, snapstoreConfig, logger) diff --git a/cmd/miscellaneous.go b/cmd/miscellaneous.go index 931e25d6c..229af81ba 100644 --- a/cmd/miscellaneous.go +++ b/cmd/miscellaneous.go @@ -23,4 +23,6 @@ func initializeSnapstoreFlags(cmd *cobra.Command) { cmd.Flags().StringVar(&storageProvider, "storage-provider", "", "snapshot storage provider") cmd.Flags().StringVar(&storageContainer, "store-container", "", "container which will be used as snapstore") cmd.Flags().StringVar(&storagePrefix, "store-prefix", "", "prefix or directory inside container under which snapstore is created") + cmd.Flags().IntVar(&maxParallelChunkUploads, "max-parallel-chunk-uploads", 5, "maximum number of parallel chunk uploads allowed ") + cmd.Flags().StringVar(&snapstoreTempDir, "snapstore-temp-directory", "/tmp", "temporary directory for processing") } diff --git a/cmd/restore.go b/cmd/restore.go index e710b7b56..08c2f2d05 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -50,9 +50,11 @@ func NewRestoreCommand(stopCh <-chan struct{}) *cobra.Command { logger.Fatalf("failed parsing peers urls for restore cluster: %v", err) } snapstoreConfig := &snapstore.Config{ - Provider: storageProvider, - Container: storageContainer, - Prefix: path.Join(storagePrefix, backupFormatVersion), + Provider: storageProvider, + Container: storageContainer, + Prefix: path.Join(storagePrefix, backupFormatVersion), + MaxParallelChunkUploads: maxParallelChunkUploads, + TempDir: snapstoreTempDir, } store, err := snapstore.GetSnapstore(snapstoreConfig) if err != nil { diff --git a/cmd/server.go b/cmd/server.go index 12eb2f330..ab78caf1c 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -67,9 +67,11 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { if storageProvider != "" { snapstoreConfig = &snapstore.Config{ - Provider: storageProvider, - Container: storageContainer, - Prefix: path.Join(storagePrefix, backupFormatVersion), + Provider: storageProvider, + Container: storageContainer, + Prefix: path.Join(storagePrefix, backupFormatVersion), + MaxParallelChunkUploads: maxParallelChunkUploads, + TempDir: snapstoreTempDir, } } @@ -135,8 +137,11 @@ func NewServerCommand(stopCh <-chan struct{}) *cobra.Command { ssrStopCh = make(chan struct{}) go handleSsrRequest(handler, ssr, ackCh, ssrStopCh, stopCh) go handleAckState(handler, ackCh) - go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot) - + if defragmentationPeriodHours < 1 { + logger.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriodHours) + } else { + go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot) + } for { logger.Infof("Probing etcd...") select { diff --git a/cmd/snapshot.go b/cmd/snapshot.go index c26b2f963..c5e6e3f04 100644 --- a/cmd/snapshot.go +++ b/cmd/snapshot.go @@ -33,9 +33,11 @@ func NewSnapshotCommand(stopCh <-chan struct{}) *cobra.Command { storing snapshots on various cloud storage providers as well as local disk location.`, Run: func(cmd *cobra.Command, args []string) { snapstoreConfig := &snapstore.Config{ - Provider: storageProvider, - Container: storageContainer, - Prefix: path.Join(storagePrefix, backupFormatVersion), + Provider: storageProvider, + Container: storageContainer, + Prefix: path.Join(storagePrefix, backupFormatVersion), + MaxParallelChunkUploads: maxParallelChunkUploads, + TempDir: snapstoreTempDir, } ss, err := snapstore.GetSnapstore(snapstoreConfig) if err != nil { @@ -64,7 +66,13 @@ storing snapshots on various cloud storage providers as well as local disk locat ssr := snapshotter.NewSnapshotter( logger, snapshotterConfig) - go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot) + + if defragmentationPeriodHours < 1 { + logger.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriodHours) + } else { + go etcdutil.DefragDataPeriodically(stopCh, tlsConfig, time.Duration(defragmentationPeriodHours)*time.Hour, time.Duration(etcdConnectionTimeout)*time.Second, ssr.TriggerFullSnapshot) + } + gcStopCh := make(chan struct{}) go ssr.RunGarbageCollector(gcStopCh) if err := ssr.Run(stopCh, true); err != nil { diff --git a/cmd/types.go b/cmd/types.go index f38996aec..96b32f161 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -57,9 +57,11 @@ var ( restoreMaxFetchers int //snapstore flags - storageProvider string - storageContainer string - storagePrefix string + storageProvider string + storageContainer string + storagePrefix string + maxParallelChunkUploads int + snapstoreTempDir string ) var emptyStruct struct{} diff --git a/pkg/etcdutil/defrag.go b/pkg/etcdutil/defrag.go index 8ad9f94b6..9025c6b0d 100644 --- a/pkg/etcdutil/defrag.go +++ b/pkg/etcdutil/defrag.go @@ -65,10 +65,6 @@ func defragData(tlsConfig *TLSConfig, etcdConnectionTimeout time.Duration) error // DefragDataPeriodically defragments the data directory of each etcd member. func DefragDataPeriodically(stopCh <-chan struct{}, tlsConfig *TLSConfig, defragmentationPeriod, etcdConnectionTimeout time.Duration, callback func()) { - if defragmentationPeriod <= time.Hour { - logrus.Infof("Disabling defragmentation since defragmentation period [%d] is less than 1", defragmentationPeriod) - return - } logrus.Infof("Defragmentation period :%d hours", defragmentationPeriod/time.Hour) for { select { diff --git a/pkg/etcdutil/defrag_test.go b/pkg/etcdutil/defrag_test.go index 1df0bdb2d..b77789e8d 100644 --- a/pkg/etcdutil/defrag_test.go +++ b/pkg/etcdutil/defrag_test.go @@ -74,5 +74,33 @@ var _ = Describe("Defrag", func() { Expect(newStatus.Header.GetRevision()).Should(BeNumerically("==", oldStatus.Header.GetRevision())) }) + + It("should defrag periodically with callback", func() { + defragCount := 0 + expectedDefragCount := 2 + defragmentationPeriod := time.Duration(30) * time.Second + client, err := GetTLSClientForEtcd(tlsConfig) + Expect(err).ShouldNot(HaveOccurred()) + defer client.Close() + ctx, cancel := context.WithTimeout(context.TODO(), etcdDialTimeout) + oldStatus, err := client.Status(ctx, endpoints[0]) + cancel() + Expect(err).ShouldNot(HaveOccurred()) + stopCh := make(chan struct{}) + time.AfterFunc(time.Second*time.Duration(75), func() { + close(stopCh) + }) + + DefragDataPeriodically(stopCh, tlsConfig, defragmentationPeriod, etcdConnectionTimeout, func() { defragCount++ }) + + ctx, cancel = context.WithTimeout(context.TODO(), etcdDialTimeout) + newStatus, err := client.Status(ctx, endpoints[0]) + cancel() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(defragCount).Should(BeNumerically("==", expectedDefragCount)) + Expect(newStatus.DbSize).Should(BeNumerically("<", oldStatus.DbSize)) + Expect(newStatus.Header.GetRevision()).Should(BeNumerically("==", oldStatus.Header.GetRevision())) + }) }) }) diff --git a/pkg/initializer/initializer.go b/pkg/initializer/initializer.go index 397b39c06..5bc035cf3 100644 --- a/pkg/initializer/initializer.go +++ b/pkg/initializer/initializer.go @@ -50,7 +50,7 @@ func (e *EtcdInitializer) Initialize() error { return fmt.Errorf("error while restoring corrupt data: %v", err) } } - return err + return nil } //NewInitializer creates an etcd initializer object. diff --git a/pkg/initializer/validator/datavalidator.go b/pkg/initializer/validator/datavalidator.go index cff68516f..62d22a5c3 100644 --- a/pkg/initializer/validator/datavalidator.go +++ b/pkg/initializer/validator/datavalidator.go @@ -86,7 +86,7 @@ func (d *DataValidator) snapDir() string { return filepath.Join(d.memberDir(), " func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(), "db") } //Validate performs the steps required to validate data for Etcd instance. -// The steps involed are: +// The steps involved are: // * Check if data directory exists. // - If data directory exists // * Check for data directory structure. diff --git a/pkg/snapstore/abs_snapstore.go b/pkg/snapstore/abs_snapstore.go index 5d309a8db..0596651a6 100644 --- a/pkg/snapstore/abs_snapstore.go +++ b/pkg/snapstore/abs_snapstore.go @@ -22,8 +22,7 @@ import ( "os" "path" "sort" - "strings" - "time" + "sync" "github.com/Azure/azure-sdk-for-go/storage" "github.com/sirupsen/logrus" @@ -36,13 +35,15 @@ const ( // ABSSnapStore is an ABS backed snapstore. type ABSSnapStore struct { - SnapStore prefix string container *storage.Container + // maxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. + maxParallelChunkUploads int + tempDir string } // NewABSSnapStore create new ABSSnapStore from shared configuration with specified bucket -func NewABSSnapStore(container, prefix string) (*ABSSnapStore, error) { +func NewABSSnapStore(container, prefix, tempDir string, maxParallelChunkUploads int) (*ABSSnapStore, error) { storageAccount, err := GetEnvVarOrError(absStorageAccount) if err != nil { return nil, err @@ -58,11 +59,11 @@ func NewABSSnapStore(container, prefix string) (*ABSSnapStore, error) { return nil, fmt.Errorf("create ABS client failed: %v", err) } - return GetSnapstoreFromClient(container, prefix, &client) + return GetSnapstoreFromClient(container, prefix, tempDir, maxParallelChunkUploads, &client) } // GetSnapstoreFromClient returns a new ABS object for a given container using the supplied storageClient -func GetSnapstoreFromClient(container, prefix string, storageClient *storage.Client) (*ABSSnapStore, error) { +func GetSnapstoreFromClient(container, prefix, tempDir string, maxParallelChunkUploads int, storageClient *storage.Client) (*ABSSnapStore, error) { client := storageClient.GetBlobService() // Check if supplied container exists @@ -77,8 +78,10 @@ func GetSnapstoreFromClient(container, prefix string, storageClient *storage.Cli } return &ABSSnapStore{ - prefix: prefix, - container: containerRef, + prefix: prefix, + container: containerRef, + maxParallelChunkUploads: maxParallelChunkUploads, + tempDir: tempDir, }, nil } @@ -115,7 +118,7 @@ func (a *ABSSnapStore) List() (SnapList, error) { // Save will write the snapshot to store func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error { // Save it locally - tmpfile, err := ioutil.TempFile(tmpDir, tmpBackupFilePrefix) + tmpfile, err := ioutil.TempFile(a.tempDir, tmpBackupFilePrefix) if err != nil { return fmt.Errorf("failed to create snapshot tempfile: %v", err) } @@ -129,7 +132,6 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error { } var ( - errCh = make(chan chunkUploadError) chunkSize = minChunkSize noOfChunks = size / chunkSize ) @@ -137,13 +139,33 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error { noOfChunks++ } + var ( + chunkUploadCh = make(chan chunk, noOfChunks) + resCh = make(chan chunkUploadResult, noOfChunks) + wg sync.WaitGroup + cancelCh = make(chan struct{}) + ) + + for i := 0; i < a.maxParallelChunkUploads; i++ { + wg.Add(1) + go a.blockUploader(&wg, cancelCh, &snap, tmpfile, chunkUploadCh, resCh) + } logrus.Infof("Uploading snapshot of size: %d, chunkSize: %d, noOfChunks: %d", size, chunkSize, noOfChunks) - for offset := int64(0); offset <= size; offset += int64(chunkSize) { - go retryBlockUpload(a, &snap, tmpfile, offset, chunkSize, errCh) + for offset, index := int64(0), 1; offset <= size; offset += int64(chunkSize) { + newChunk := chunk{ + offset: offset, + size: chunkSize, + id: index, + } + chunkUploadCh <- newChunk + index++ } + logrus.Infof("Triggered chunk upload for all chunks, total: %d", noOfChunks) - snapshotErr := collectChunkUploadError(errCh, noOfChunks) - if len(snapshotErr) == 0 { + snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) + wg.Wait() + + if snapshotErr == nil { logrus.Info("All chunk uploaded successfully. Uploading blocklist.") blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName) blob := a.container.GetBlobReference(blobName) @@ -155,16 +177,17 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error { } blockList = append(blockList, block) } - return blob.PutBlockList(blockList, &storage.PutBlockListOptions{}) - } - var collectedErr []string - for _, chunkErr := range snapshotErr { - collectedErr = append(collectedErr, fmt.Sprintf("failed uploading chunk with offset %d with error %v", chunkErr.offset, chunkErr.err)) + if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil { + return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err) + } + logrus.Info("Blocklist uploaded successfully.") + return nil } - return fmt.Errorf(strings.Join(collectedErr, "\n")) + + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) } -func uploadBlock(s *ABSSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64) error { +func (a *ABSSnapStore) uploadBlock(snap *Snapshot, file *os.File, offset, chunkSize int64) error { fileInfo, err := file.Stat() if err != nil { return err @@ -176,38 +199,34 @@ func uploadBlock(s *ABSSnapStore, snap *Snapshot, file *os.File, offset, chunkSi } sr := io.NewSectionReader(file, offset, chunkSize) - blobName := path.Join(s.prefix, snap.SnapDir, snap.SnapName) - blob := s.container.GetBlobReference(blobName) + blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName) + blob := a.container.GetBlobReference(blobName) partNumber := ((offset / chunkSize) + 1) blockID := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))) err = blob.PutBlockWithLength(blockID, uint64(size), sr, &storage.PutBlockOptions{}) return err } -func retryBlockUpload(s *ABSSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64, errCh chan<- chunkUploadError) { - var ( - maxAttempts uint = 5 - curAttempt uint = 1 - err error - ) +func (a *ABSSnapStore) blockUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, snap *Snapshot, file *os.File, chunkUploadCh chan chunk, errCh chan<- chunkUploadResult) { + defer wg.Done() for { - logrus.Infof("Uploading chunk with offset : %d, attempt: %d", offset, curAttempt) - err = uploadBlock(s, snap, file, offset, chunkSize) - logrus.Infof("For chunk upload of offset %d, err %v", offset, err) - if err == nil || curAttempt == maxAttempts { - break - } - delayTime := (1 << curAttempt) - curAttempt++ - logrus.Warnf("Will try to upload chunk with offset: %d at attempt %d after %d seconds", offset, curAttempt, delayTime) - time.Sleep((time.Duration)(delayTime) * time.Second) - } + select { + case <-stopCh: + return + case chunk, more := <-chunkUploadCh: + if !more { + return + } + logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) + err := a.uploadBlock(snap, file, chunk.offset, chunk.size) + logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) + errCh <- chunkUploadResult{ + err: nil, + chunk: &chunk, + } - errCh <- chunkUploadError{ - err: err, - offset: offset, + } } - return } // Delete should delete the snapshot file from store diff --git a/pkg/snapstore/gcs_snapstore.go b/pkg/snapstore/gcs_snapstore.go index 7f3430467..913d22b43 100644 --- a/pkg/snapstore/gcs_snapstore.go +++ b/pkg/snapstore/gcs_snapstore.go @@ -24,7 +24,7 @@ import ( "path" "sort" "strings" - "time" + "sync" "cloud.google.com/go/storage" "github.com/sirupsen/logrus" @@ -33,11 +33,13 @@ import ( // GCSSnapStore is snapstore with local disk as backend type GCSSnapStore struct { - SnapStore prefix string client *storage.Client bucket string ctx context.Context + // maxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. + maxParallelChunkUploads int + tempDir string } const ( @@ -45,7 +47,7 @@ const ( ) // NewGCSSnapStore create new S3SnapStore from shared configuration with specified bucket -func NewGCSSnapStore(bucket, prefix string) (*GCSSnapStore, error) { +func NewGCSSnapStore(bucket, prefix, tempDir string, maxParallelChunkUploads int) (*GCSSnapStore, error) { ctx := context.TODO() gcsClient, err := storage.NewClient(ctx) if err != nil { @@ -53,10 +55,12 @@ func NewGCSSnapStore(bucket, prefix string) (*GCSSnapStore, error) { } return &GCSSnapStore{ - prefix: prefix, - client: gcsClient, - bucket: bucket, - ctx: ctx, + prefix: prefix, + client: gcsClient, + bucket: bucket, + ctx: ctx, + maxParallelChunkUploads: maxParallelChunkUploads, + tempDir: tempDir, }, nil } @@ -70,7 +74,7 @@ func (s *GCSSnapStore) Fetch(snap Snapshot) (io.ReadCloser, error) { // Save will write the snapshot to store func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error { // Save it locally - tmpfile, err := ioutil.TempFile(tmpDir, tmpBackupFilePrefix) + tmpfile, err := ioutil.TempFile(s.tempDir, tmpBackupFilePrefix) if err != nil { return fmt.Errorf("failed to create snapshot tempfile: %v", err) } @@ -84,7 +88,6 @@ func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error { } var ( - errCh = make(chan chunkUploadError) chunkSize = int64(math.Max(float64(minChunkSize), float64(size/gcsNoOfChunk))) noOfChunks = size / chunkSize ) @@ -92,13 +95,34 @@ func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error { noOfChunks++ } + var ( + chunkUploadCh = make(chan chunk, noOfChunks) + resCh = make(chan chunkUploadResult, noOfChunks) + wg sync.WaitGroup + cancelCh = make(chan struct{}) + ) + + for i := 0; i < s.maxParallelChunkUploads; i++ { + wg.Add(1) + go s.componentUploader(&wg, cancelCh, &snap, tmpfile, chunkUploadCh, resCh) + } + logrus.Infof("Uploading snapshot of size: %d, chunkSize: %d, noOfChunks: %d", size, chunkSize, noOfChunks) - for offset := int64(0); offset <= size; offset += int64(chunkSize) { - go retryComponentUpload(s, &snap, tmpfile, offset, chunkSize, errCh) + for offset, index := int64(0), 1; offset <= size; offset += int64(chunkSize) { + newChunk := chunk{ + id: index, + offset: offset, + size: chunkSize, + } + chunkUploadCh <- newChunk + index++ } + logrus.Infof("Triggered chunk upload for all chunks, total: %d", noOfChunks) - snapshotErr := collectChunkUploadError(errCh, noOfChunks) - if len(snapshotErr) == 0 { + snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) + wg.Wait() + + if snapshotErr == nil { logrus.Info("All chunk uploaded successfully. Uploading composite object.") bh := s.client.Bucket(s.bucket) var subObjects []*storage.ObjectHandle @@ -112,17 +136,17 @@ func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error { c := obj.ComposerFrom(subObjects...) ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout) defer cancel() - _, err := c.Run(ctx) - return err - } - var collectedErr []string - for _, chunkErr := range snapshotErr { - collectedErr = append(collectedErr, fmt.Sprintf("failed uploading chunk with offset %d with error %v", chunkErr.offset, chunkErr.err)) + if _, err := c.Run(ctx); err != nil { + return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err) + } + logrus.Info("Composite object uploaded successfully.") + return nil } - return fmt.Errorf(strings.Join(collectedErr, "\n")) + + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) } -func uploadComponent(s *GCSSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64) error { +func (s *GCSSnapStore) uploadComponent(snap *Snapshot, file *os.File, offset, chunkSize int64) error { fileInfo, err := file.Stat() if err != nil { return err @@ -147,28 +171,24 @@ func uploadComponent(s *GCSSnapStore, snap *Snapshot, file *os.File, offset, chu return w.Close() } -func retryComponentUpload(s *GCSSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64, errCh chan<- chunkUploadError) { - var ( - maxAttempts uint = 5 - curAttempt uint = 1 - err error - ) +func (s *GCSSnapStore) componentUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, snap *Snapshot, file *os.File, chunkUploadCh chan chunk, errCh chan<- chunkUploadResult) { + defer wg.Done() for { - logrus.Infof("Uploading chunk with offset : %d, attempt: %d", offset, curAttempt) - err = uploadComponent(s, snap, file, offset, chunkSize) - logrus.Infof("For chunk upload of offset %d, err %v", offset, err) - if err == nil || curAttempt == maxAttempts { - break + select { + case <-stopCh: + return + case chunk, more := <-chunkUploadCh: + if !more { + return + } + logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) + err := s.uploadComponent(snap, file, chunk.offset, chunk.size) + logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) + errCh <- chunkUploadResult{ + err: err, + chunk: &chunk, + } } - delayTime := (1 << curAttempt) - curAttempt++ - logrus.Warnf("Will try to upload chunk with offset: %d at attempt %d after %d seconds", offset, curAttempt, delayTime) - time.Sleep((time.Duration)(delayTime) * time.Second) - } - - errCh <- chunkUploadError{ - err: err, - offset: offset, } } diff --git a/pkg/snapstore/local_snapstore.go b/pkg/snapstore/local_snapstore.go index af24215c3..0da96fac1 100644 --- a/pkg/snapstore/local_snapstore.go +++ b/pkg/snapstore/local_snapstore.go @@ -26,7 +26,6 @@ import ( // LocalSnapStore is snapstore with local disk as backend type LocalSnapStore struct { - SnapStore prefix string } diff --git a/pkg/snapstore/s3_snapstore.go b/pkg/snapstore/s3_snapstore.go index 0a7087613..26bcd0c94 100644 --- a/pkg/snapstore/s3_snapstore.go +++ b/pkg/snapstore/s3_snapstore.go @@ -23,7 +23,6 @@ import ( "os" "path" "sort" - "strings" "sync" "time" @@ -34,49 +33,48 @@ import ( "github.com/sirupsen/logrus" ) -const ( - tmpDir = "/tmp" - tmpBackupFilePrefix = "etcd-backup-" -) - const ( s3NoOfChunk int64 = 10000 //Default configuration in swift installation ) // S3SnapStore is snapstore with local disk as backend type S3SnapStore struct { - SnapStore prefix string client s3iface.S3API bucket string multiPart sync.Mutex + // maxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. + maxParallelChunkUploads int + tempDir string } // NewS3SnapStore create new S3SnapStore from shared configuration with specified bucket -func NewS3SnapStore(bucket, prefix string) (*S3SnapStore, error) { - return NewS3FromSessionOpt(bucket, prefix, session.Options{ +func NewS3SnapStore(bucket, prefix, tempDir string, maxParallelChunkUploads int) (*S3SnapStore, error) { + return newS3FromSessionOpt(bucket, prefix, tempDir, maxParallelChunkUploads, session.Options{ // Setting this is equal to the AWS_SDK_LOAD_CONFIG environment variable was set. // We want to save the work to set AWS_SDK_LOAD_CONFIG=1 outside. SharedConfigState: session.SharedConfigEnable, }) } -// NewS3FromSessionOpt will create the new S3 snapstore object from S3 session options -func NewS3FromSessionOpt(bucket, prefix string, so session.Options) (*S3SnapStore, error) { +// newS3FromSessionOpt will create the new S3 snapstore object from S3 session options +func newS3FromSessionOpt(bucket, prefix, tempDir string, maxParallelChunkUploads int, so session.Options) (*S3SnapStore, error) { sess, err := session.NewSessionWithOptions(so) if err != nil { return nil, fmt.Errorf("new AWS session failed: %v", err) } cli := s3.New(sess) - return NewS3FromClient(bucket, prefix, cli), nil + return NewS3FromClient(bucket, prefix, tempDir, maxParallelChunkUploads, cli), nil } // NewS3FromClient will create the new S3 snapstore object from S3 client -func NewS3FromClient(bucket, prefix string, cli s3iface.S3API) *S3SnapStore { +func NewS3FromClient(bucket, prefix, tempDir string, maxParallelChunkUploads int, cli s3iface.S3API) *S3SnapStore { return &S3SnapStore{ - bucket: bucket, - prefix: prefix, - client: cli, + bucket: bucket, + prefix: prefix, + client: cli, + maxParallelChunkUploads: maxParallelChunkUploads, + tempDir: tempDir, } } @@ -89,14 +87,12 @@ func (s *S3SnapStore) Fetch(snap Snapshot) (io.ReadCloser, error) { if err != nil { return nil, err } - return resp.Body, nil } // Save will write the snapshot to store func (s *S3SnapStore) Save(snap Snapshot, r io.Reader) error { - // since s3 requires io.ReadSeeker, this is the required hack. - tmpfile, err := ioutil.TempFile(tmpDir, tmpBackupFilePrefix) + tmpfile, err := ioutil.TempFile(s.tempDir, tmpBackupFilePrefix) if err != nil { return fmt.Errorf("failed to create snapshot tempfile: %v", err) } @@ -125,23 +121,43 @@ func (s *S3SnapStore) Save(snap Snapshot, r io.Reader) error { return fmt.Errorf("failed to initiate multipart upload %v", err) } logrus.Infof("Successfully initiated the multipart upload with upload ID : %s", *uploadOutput.UploadId) + var ( - errCh = make(chan chunkUploadError) chunkSize = int64(math.Max(float64(minChunkSize), float64(size/s3NoOfChunk))) noOfChunks = size / chunkSize ) if size%chunkSize != 0 { noOfChunks++ } + var ( + completedParts = make([]*s3.CompletedPart, noOfChunks) + chunkUploadCh = make(chan chunk, noOfChunks) + resCh = make(chan chunkUploadResult, noOfChunks) + wg sync.WaitGroup + cancelCh = make(chan struct{}) + ) - completedParts := make([]*s3.CompletedPart, noOfChunks) + for i := 0; i < s.maxParallelChunkUploads; i++ { + wg.Add(1) + go s.partUploader(&wg, cancelCh, &snap, tmpfile, uploadOutput.UploadId, completedParts, chunkUploadCh, resCh) + } logrus.Infof("Uploading snapshot of size: %d, chunkSize: %d, noOfChunks: %d", size, chunkSize, noOfChunks) - for offset := int64(0); offset <= size; offset += int64(chunkSize) { - go retryPartUpload(s, &snap, tmpfile, uploadOutput.UploadId, completedParts, offset, chunkSize, errCh) + + for offset, index := int64(0), 1; offset <= size; offset += int64(chunkSize) { + newChunk := chunk{ + id: index, + offset: offset, + size: chunkSize, + } + logrus.Debugf("Triggering chunk upload for offset: %d", offset) + chunkUploadCh <- newChunk + index++ } + logrus.Infof("Triggered chunk upload for all chunks, total: %d", noOfChunks) + snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) + wg.Wait() - snapshotErr := collectChunkUploadError(errCh, noOfChunks) - if len(snapshotErr) != 0 { + if snapshotErr != nil { ctx := context.TODO() ctx, cancel := context.WithTimeout(ctx, chunkUploadTimeout) defer cancel() @@ -167,20 +183,15 @@ func (s *S3SnapStore) Save(snap Snapshot, r io.Reader) error { } if err != nil { - return err + return fmt.Errorf("failed completing snapshot upload with error %v", err) } - if len(snapshotErr) == 0 { - return nil - } - - var collectedErr []string - for _, chunkErr := range snapshotErr { - collectedErr = append(collectedErr, fmt.Sprintf("failed uploading chunk with offset %d with error %v", chunkErr.offset, chunkErr.err)) + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) } - return fmt.Errorf(strings.Join(collectedErr, "\n")) + return nil } -func uploadPart(s *S3SnapStore, snap *Snapshot, file *os.File, uploadID *string, completedParts []*s3.CompletedPart, offset, chunkSize int64) error { +func (s *S3SnapStore) uploadPart(snap *Snapshot, file *os.File, uploadID *string, completedParts []*s3.CompletedPart, offset, chunkSize int64) error { fileInfo, err := file.Stat() if err != nil { return err @@ -213,30 +224,25 @@ func uploadPart(s *S3SnapStore, snap *Snapshot, file *os.File, uploadID *string, return err } -func retryPartUpload(s *S3SnapStore, snap *Snapshot, file *os.File, uploadID *string, completedParts []*s3.CompletedPart, offset, chunkSize int64, errCh chan<- chunkUploadError) { - var ( - maxAttempts uint = 5 - curAttempt uint = 1 - err error - ) +func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, snap *Snapshot, file *os.File, uploadID *string, completedParts []*s3.CompletedPart, chunkUploadCh <-chan chunk, errCh chan<- chunkUploadResult) { + defer wg.Done() for { - logrus.Infof("Uploading chunk with offset : %d, attempt: %d", offset, curAttempt) - err = uploadPart(s, snap, file, uploadID, completedParts, offset, chunkSize) - logrus.Infof("For chunk upload of offset %d, err %v", offset, err) - if err == nil || curAttempt == maxAttempts { - break + select { + case <-stopCh: + return + case chunk, more := <-chunkUploadCh: + if !more { + return + } + logrus.Infof("Uploading chunk with id: %d, offset: %d, attempt: %d", chunk.id, chunk.offset, chunk.attempt) + err := s.uploadPart(snap, file, uploadID, completedParts, chunk.offset, chunk.size) + logrus.Infof("For chunk upload of id: %d, offset: %d, error: %v", chunk.id, chunk.offset, err) + errCh <- chunkUploadResult{ + err: err, + chunk: &chunk, + } } - delayTime := (1 << curAttempt) - curAttempt++ - logrus.Warnf("Will try to upload chunk with offset: %d at attempt %d after %d seconds", offset, curAttempt, delayTime) - time.Sleep((time.Duration)(delayTime) * time.Second) - } - - errCh <- chunkUploadError{ - err: err, - offset: offset, } - return } // List will list the snapshots from store diff --git a/pkg/snapstore/snapstore_test.go b/pkg/snapstore/snapstore_test.go index 82d7e18f6..ae5d8c397 100644 --- a/pkg/snapstore/snapstore_test.go +++ b/pkg/snapstore/snapstore_test.go @@ -77,7 +77,7 @@ var _ = Describe("Snapstore", func() { multiPartUploads: map[string]*[][]byte{}, } snapstores = map[string]SnapStore{ - "s3": NewS3FromClient(bucket, prefix, &m), + "s3": NewS3FromClient(bucket, prefix, "/tmp", 5, &m), } }) diff --git a/pkg/snapstore/swift_snapstore.go b/pkg/snapstore/swift_snapstore.go index 6fb87d5f1..e5379a4e0 100644 --- a/pkg/snapstore/swift_snapstore.go +++ b/pkg/snapstore/swift_snapstore.go @@ -24,7 +24,7 @@ import ( "path" "sort" "strings" - "time" + "sync" "github.com/sirupsen/logrus" @@ -36,10 +36,12 @@ import ( // SwiftSnapStore is snapstore with Openstack Swift as backend type SwiftSnapStore struct { - SnapStore prefix string client *gophercloud.ServiceClient bucket string + // maxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. + maxParallelChunkUploads int + tempDir string } const ( @@ -47,7 +49,7 @@ const ( ) // NewSwiftSnapStore create new SwiftSnapStore from shared configuration with specified bucket -func NewSwiftSnapStore(bucket, prefix string) (*SwiftSnapStore, error) { +func NewSwiftSnapStore(bucket, prefix, tempDir string, maxParallelChunkUploads int) (*SwiftSnapStore, error) { authOpts, err := openstack.AuthOptionsFromEnv() if err != nil { return nil, err @@ -62,12 +64,20 @@ func NewSwiftSnapStore(bucket, prefix string) (*SwiftSnapStore, error) { return nil, err } - return &SwiftSnapStore{ - prefix: prefix, - client: client, - bucket: bucket, - }, nil + return NewSwiftSnapstoreFromClient(bucket, prefix, tempDir, maxParallelChunkUploads, client), nil + +} + +// NewSwiftSnapstoreFromClient will create the new Swift snapstore object from Swift client +func NewSwiftSnapstoreFromClient(bucket, prefix, tempDir string, maxParallelChunkUploads int, cli *gophercloud.ServiceClient) *SwiftSnapStore { + return &SwiftSnapStore{ + bucket: bucket, + prefix: prefix, + client: cli, + maxParallelChunkUploads: maxParallelChunkUploads, + tempDir: tempDir, + } } // Fetch should open reader for the snapshot file from store @@ -79,7 +89,7 @@ func (s *SwiftSnapStore) Fetch(snap Snapshot) (io.ReadCloser, error) { // Save will write the snapshot to store func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error { // Save it locally - tmpfile, err := ioutil.TempFile(tmpDir, tmpBackupFilePrefix) + tmpfile, err := ioutil.TempFile(s.tempDir, tmpBackupFilePrefix) if err != nil { return fmt.Errorf("failed to create snapshot tempfile: %v", err) } @@ -93,7 +103,6 @@ func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error { } var ( - errCh = make(chan chunkUploadError) chunkSize = int64(math.Max(float64(minChunkSize), float64(size/swiftNoOfChunk))) noOfChunks = size / chunkSize ) @@ -101,13 +110,34 @@ func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error { noOfChunks++ } + var ( + chunkUploadCh = make(chan chunk, noOfChunks) + resCh = make(chan chunkUploadResult, noOfChunks) + wg sync.WaitGroup + cancelCh = make(chan struct{}) + ) + + for i := 0; i < s.maxParallelChunkUploads; i++ { + wg.Add(1) + go s.chunkUploader(&wg, cancelCh, &snap, tmpfile, chunkUploadCh, resCh) + } + logrus.Infof("Uploading snapshot of size: %d, chunkSize: %d, noOfChunks: %d", size, chunkSize, noOfChunks) - for offset := int64(0); offset <= size; offset += int64(chunkSize) { - go retryChunkUpload(s, &snap, tmpfile, offset, chunkSize, errCh) + for offset, index := int64(0), 1; offset <= size; offset += int64(chunkSize) { + newChunk := chunk{ + id: index, + offset: offset, + size: chunkSize, + } + chunkUploadCh <- newChunk + index++ } + logrus.Infof("Triggered chunk upload for all chunks, total: %d", noOfChunks) - snapshotErr := collectChunkUploadError(errCh, noOfChunks) - if len(snapshotErr) == 0 { + snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) + wg.Wait() + + if snapshotErr == nil { logrus.Info("All chunk uploaded successfully. Uploading manifest.") b := make([]byte, 0) opts := objects.CreateOpts{ @@ -115,17 +145,17 @@ func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error { ContentLength: chunkSize, ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName), } - res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts) - return res.Err - } - var collectedErr []string - for _, chunkErr := range snapshotErr { - collectedErr = append(collectedErr, fmt.Sprintf("failed uploading chunk with offset %d with error %v", chunkErr.offset, chunkErr.err)) + if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil { + return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err) + } + logrus.Info("Manifest object uploaded successfully.") + return nil } - return fmt.Errorf(strings.Join(collectedErr, "\n")) + + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) } -func uploadChunk(s *SwiftSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64) error { +func (s *SwiftSnapStore) uploadChunk(snap *Snapshot, file *os.File, offset, chunkSize int64) error { fileInfo, err := file.Stat() if err != nil { return err @@ -146,35 +176,29 @@ func uploadChunk(s *SwiftSnapStore, snap *Snapshot, file *os.File, offset, chunk return res.Err } -func retryChunkUpload(s *SwiftSnapStore, snap *Snapshot, file *os.File, offset, chunkSize int64, errCh chan<- chunkUploadError) { - var ( - maxAttempts uint = 5 - curAttempt uint = 1 - err error - ) +func (s *SwiftSnapStore) chunkUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, snap *Snapshot, file *os.File, chunkUploadCh chan chunk, errCh chan<- chunkUploadResult) { + defer wg.Done() for { - logrus.Infof("Uploading chunk with offset : %d, attempt: %d", offset, curAttempt) - err = uploadChunk(s, snap, file, offset, chunkSize) - logrus.Infof("For chunk upload of offset %d, err %v", offset, err) - if err == nil || curAttempt == maxAttempts { - break + select { + case <-stopCh: + return + case chunk, more := <-chunkUploadCh: + if !more { + return + } + logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) + err := s.uploadChunk(snap, file, chunk.offset, chunk.size) + logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) + errCh <- chunkUploadResult{ + err: nil, + chunk: &chunk, + } } - delayTime := (1 << curAttempt) - curAttempt++ - logrus.Warnf("Will try to upload chunk with offset: %d at attempt %d after %d seconds", offset, curAttempt, delayTime) - time.Sleep((time.Duration)(delayTime) * time.Second) } - - errCh <- chunkUploadError{ - err: err, - offset: offset, - } - return } // List will list the snapshots from store func (s *SwiftSnapStore) List() (SnapList, error) { - opts := &objects.ListOpts{ Full: false, Prefix: s.prefix, diff --git a/pkg/snapstore/types.go b/pkg/snapstore/types.go index db1ae56fb..e834d1efe 100644 --- a/pkg/snapstore/types.go +++ b/pkg/snapstore/types.go @@ -52,8 +52,14 @@ const ( SnapshotKindFull = "Full" // SnapshotKindDelta is constant for delta snapshot kind SnapshotKindDelta = "Incr" + // ChunkUploadTimeout is timeout for uploading chunk chunkUploadTimeout = 180 * time.Second + + tmpBackupFilePrefix = "etcd-backup-" + + // maxRetryAttempts indicates the number of attempts to be retried in case of failure to upload chunk. + maxRetryAttempts = 5 ) // Snapshot structure represents the metadata of snapshot @@ -70,17 +76,27 @@ type Snapshot struct { // SnapList is list of snapshots type SnapList []*Snapshot -// Config defines the configuration to create snapshot store +// Config defines the configuration to create snapshot store. type Config struct { - // Provider indicated the cloud provider + // Provider indicated the cloud provider. Provider string - // Container holds the name of bucket or container to which snapshot will be stored + // Container holds the name of bucket or container to which snapshot will be stored. Container string - // Prefix holds the prefix or directory under StorageContainer under which snapshot will be stored + // Prefix holds the prefix or directory under StorageContainer under which snapshot will be stored. Prefix string + // MaxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. + MaxParallelChunkUploads int + // Temporay Directory + TempDir string } -type chunkUploadError struct { - err error - offset int64 +type chunk struct { + offset int64 + size int64 + attempt uint + id int +} +type chunkUploadResult struct { + err error + chunk *chunk } diff --git a/pkg/snapstore/utils.go b/pkg/snapstore/utils.go index a66bea36a..fea2b6155 100644 --- a/pkg/snapstore/utils.go +++ b/pkg/snapstore/utils.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "path" + "time" "github.com/sirupsen/logrus" ) @@ -32,6 +33,9 @@ func GetSnapstore(config *Config) (SnapStore, error) { if config.Container == "" { config.Container = os.Getenv(envStorageContainer) } + if config.MaxParallelChunkUploads <= 0 { + config.MaxParallelChunkUploads = 5 + } switch config.Provider { case SnapstoreProviderLocal, "": if config.Container == "" { @@ -42,22 +46,22 @@ func GetSnapstore(config *Config) (SnapStore, error) { if config.Container == "" { return nil, fmt.Errorf("storage container name not specified") } - return NewS3SnapStore(config.Container, config.Prefix) + return NewS3SnapStore(config.Container, config.Prefix, config.TempDir, config.MaxParallelChunkUploads) case SnapstoreProviderABS: if config.Container == "" { return nil, fmt.Errorf("storage container name not specified") } - return NewABSSnapStore(config.Container, config.Prefix) + return NewABSSnapStore(config.Container, config.Prefix, config.TempDir, config.MaxParallelChunkUploads) case SnapstoreProviderGCS: if config.Container == "" { return nil, fmt.Errorf("storage container name not specified") } - return NewGCSSnapStore(config.Container, config.Prefix) + return NewGCSSnapStore(config.Container, config.Prefix, config.TempDir, config.MaxParallelChunkUploads) case SnapstoreProviderSwift: if config.Container == "" { return nil, fmt.Errorf("storage container name not specified") } - return NewSwiftSnapStore(config.Container, config.Prefix) + return NewSwiftSnapStore(config.Container, config.Prefix, config.TempDir, config.MaxParallelChunkUploads) default: return nil, fmt.Errorf("unsupported storage provider : %s", config.Provider) @@ -76,18 +80,35 @@ func GetEnvVarOrError(varName string) (string, error) { } // collectChunkUploadError collects the error from all go routine to upload individual chunks -func collectChunkUploadError(errCh chan chunkUploadError, noOfChunks int64) []chunkUploadError { - var snapshotErr []chunkUploadError +func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploadResult, stopCh chan struct{}, noOfChunks int64) *chunkUploadResult { remainingChunks := noOfChunks logrus.Infof("No of Chunks:= %d", noOfChunks) - for { - chunkErr := <-errCh - if chunkErr.err != nil { - snapshotErr = append(snapshotErr, chunkErr) + for chunkRes := range resCh { + logrus.Infof("Received chunk result for id: %d, offset: %d", chunkRes.chunk.id, chunkRes.chunk.offset) + if chunkRes.err != nil { + if chunkRes.chunk.attempt == maxRetryAttempts { + logrus.Errorf("Received the chunk upload error from one of the workers. Sending stop signal to all workers.") + close(stopCh) + return &chunkRes + } + chunk := chunkRes.chunk + delayTime := (1 << chunk.attempt) + chunk.attempt++ + logrus.Warnf("Will try to upload chunk id:%d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime) + time.AfterFunc(time.Duration(delayTime)*time.Second, func() { + select { + case <-stopCh: + return + default: + chunkUploadCh <- *chunk + } + }) } remainingChunks-- if remainingChunks == 0 { - return snapshotErr + close(stopCh) + break } } + return nil } From a19ec640fade6daf8554e3cb1e5abe281eac013a Mon Sep 17 00:00:00 2001 From: Swapnil Mhamane Date: Tue, 27 Nov 2018 14:05:25 +0530 Subject: [PATCH 2/3] Fix the error handling for chunk on azure, swift Signed-off-by: Swapnil Mhamane --- pkg/snapstore/abs_snapstore.go | 43 ++++++++++++-------------- pkg/snapstore/gcs_snapstore.go | 44 +++++++++++++-------------- pkg/snapstore/s3_snapstore.go | 32 ++++++++++--------- pkg/snapstore/s3_snapstore_test.go | 49 ++++++++++++++++++++++++++++++ pkg/snapstore/swift_snapstore.go | 36 +++++++++++----------- pkg/snapstore/types.go | 2 +- pkg/snapstore/utils.go | 17 ++++++----- 7 files changed, 135 insertions(+), 88 deletions(-) diff --git a/pkg/snapstore/abs_snapstore.go b/pkg/snapstore/abs_snapstore.go index 0596651a6..19d182956 100644 --- a/pkg/snapstore/abs_snapstore.go +++ b/pkg/snapstore/abs_snapstore.go @@ -165,26 +165,25 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error { snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) wg.Wait() - if snapshotErr == nil { - logrus.Info("All chunk uploaded successfully. Uploading blocklist.") - blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName) - blob := a.container.GetBlobReference(blobName) - var blockList []storage.Block - for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { - block := storage.Block{ - ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))), - Status: storage.BlockStatusUncommitted, - } - blockList = append(blockList, block) - } - if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil { - return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err) + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + } + logrus.Info("All chunk uploaded successfully. Uploading blocklist.") + blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName) + blob := a.container.GetBlobReference(blobName) + var blockList []storage.Block + for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { + block := storage.Block{ + ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))), + Status: storage.BlockStatusUncommitted, } - logrus.Info("Blocklist uploaded successfully.") - return nil + blockList = append(blockList, block) } - - return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil { + return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err) + } + logrus.Info("Blocklist uploaded successfully.") + return nil } func (a *ABSSnapStore) uploadBlock(snap *Snapshot, file *os.File, offset, chunkSize int64) error { @@ -213,18 +212,16 @@ func (a *ABSSnapStore) blockUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) err := a.uploadBlock(snap, file, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) errCh <- chunkUploadResult{ - err: nil, + err: err, chunk: &chunk, } - } } } diff --git a/pkg/snapstore/gcs_snapstore.go b/pkg/snapstore/gcs_snapstore.go index 913d22b43..19d7f1765 100644 --- a/pkg/snapstore/gcs_snapstore.go +++ b/pkg/snapstore/gcs_snapstore.go @@ -122,28 +122,27 @@ func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error { snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) wg.Wait() - if snapshotErr == nil { - logrus.Info("All chunk uploaded successfully. Uploading composite object.") - bh := s.client.Bucket(s.bucket) - var subObjects []*storage.ObjectHandle - for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { - name := path.Join(s.prefix, snap.SnapDir, snap.SnapName, fmt.Sprintf("%010d", partNumber)) - obj := bh.Object(name) - subObjects = append(subObjects, obj) - } - name := path.Join(s.prefix, snap.SnapDir, snap.SnapName) + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + } + logrus.Info("All chunk uploaded successfully. Uploading composite object.") + bh := s.client.Bucket(s.bucket) + var subObjects []*storage.ObjectHandle + for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { + name := path.Join(s.prefix, snap.SnapDir, snap.SnapName, fmt.Sprintf("%010d", partNumber)) obj := bh.Object(name) - c := obj.ComposerFrom(subObjects...) - ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout) - defer cancel() - if _, err := c.Run(ctx); err != nil { - return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err) - } - logrus.Info("Composite object uploaded successfully.") - return nil + subObjects = append(subObjects, obj) } - - return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + name := path.Join(s.prefix, snap.SnapDir, snap.SnapName) + obj := bh.Object(name) + c := obj.ComposerFrom(subObjects...) + ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout) + defer cancel() + if _, err := c.Run(ctx); err != nil { + return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err) + } + logrus.Info("Composite object uploaded successfully.") + return nil } func (s *GCSSnapStore) uploadComponent(snap *Snapshot, file *os.File, offset, chunkSize int64) error { @@ -177,13 +176,12 @@ func (s *GCSSnapStore) componentUploader(wg *sync.WaitGroup, stopCh <-chan struc select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) err := s.uploadComponent(snap, file, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) errCh <- chunkUploadResult{ err: err, chunk: &chunk, diff --git a/pkg/snapstore/s3_snapstore.go b/pkg/snapstore/s3_snapstore.go index 26bcd0c94..28dc84063 100644 --- a/pkg/snapstore/s3_snapstore.go +++ b/pkg/snapstore/s3_snapstore.go @@ -230,13 +230,12 @@ func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, s select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with id: %d, offset: %d, attempt: %d", chunk.id, chunk.offset, chunk.attempt) err := s.uploadPart(snap, file, uploadID, completedParts, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of id: %d, offset: %d, error: %v", chunk.id, chunk.offset, err) errCh <- chunkUploadResult{ err: err, chunk: &chunk, @@ -247,25 +246,28 @@ func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, s // List will list the snapshots from store func (s *S3SnapStore) List() (SnapList, error) { - resp, err := s.client.ListObjects(&s3.ListObjectsInput{ + var snapList SnapList + in := &s3.ListObjectsInput{ Bucket: aws.String(s.bucket), Prefix: aws.String(fmt.Sprintf("%s/", s.prefix)), + } + err := s.client.ListObjectsPages(in, func(page *s3.ListObjectsOutput, lastPage bool) bool { + for _, key := range page.Contents { + k := (*key.Key)[len(*page.Prefix):] + snap, err := ParseSnapshot(k) + if err != nil { + // Warning + logrus.Warnf("Invalid snapshot found. Ignoring it: %s", k) + } else { + snapList = append(snapList, snap) + } + } + return !lastPage }) if err != nil { return nil, err } - var snapList SnapList - for _, key := range resp.Contents { - k := (*key.Key)[len(*resp.Prefix):] - snap, err := ParseSnapshot(k) - if err != nil { - // Warning - logrus.Warnf("Invalid snapshot found. Ignoring it:%s\n", k) - } else { - snapList = append(snapList, snap) - } - } sort.Sort(snapList) return snapList, nil } diff --git a/pkg/snapstore/s3_snapstore_test.go b/pkg/snapstore/s3_snapstore_test.go index a77270716..e00709b6b 100644 --- a/pkg/snapstore/s3_snapstore_test.go +++ b/pkg/snapstore/s3_snapstore_test.go @@ -19,6 +19,7 @@ import ( "fmt" "io" "io/ioutil" + "sort" "strings" "time" @@ -148,6 +149,54 @@ func (m *mockS3Client) ListObjects(in *s3.ListObjectsInput) (*s3.ListObjectsOutp return out, nil } +// ListObject returns the objects from map for mock test +func (m *mockS3Client) ListObjectsPages(in *s3.ListObjectsInput, callback func(*s3.ListObjectsOutput, bool) bool) error { + var ( + count int64 = 0 + limit int64 = 1000 + lastPage bool = false + keys []string + out = &s3.ListObjectsOutput{ + Prefix: in.Prefix, + Contents: make([]*s3.Object, 0), + } + ) + if in.MaxKeys != nil { + limit = *in.MaxKeys + } + for key := range m.objects { + keys = append(keys, key) + } + sort.Strings(keys) + + for index, key := range keys { + if strings.HasPrefix(key, *in.Prefix) { + keyPtr := new(string) + *keyPtr = key + tempObj := &s3.Object{ + Key: keyPtr, + } + out.Contents = append(out.Contents, tempObj) + count++ + } + if index == len(keys)-1 { + lastPage = true + } + if count == limit || lastPage { + if !callback(out, lastPage) { + return nil + } + count = 0 + out = &s3.ListObjectsOutput{ + Prefix: in.Prefix, + Contents: make([]*s3.Object, 0), + NextMarker: &key, + } + } + } + return nil +} + // DeleteObject deletes the object from map for mock test func (m *mockS3Client) DeleteObject(in *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) { delete(m.objects, *in.Key) diff --git a/pkg/snapstore/swift_snapstore.go b/pkg/snapstore/swift_snapstore.go index e5379a4e0..a1a4110e9 100644 --- a/pkg/snapstore/swift_snapstore.go +++ b/pkg/snapstore/swift_snapstore.go @@ -137,22 +137,21 @@ func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error { snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) wg.Wait() - if snapshotErr == nil { - logrus.Info("All chunk uploaded successfully. Uploading manifest.") - b := make([]byte, 0) - opts := objects.CreateOpts{ - Content: bytes.NewReader(b), - ContentLength: chunkSize, - ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName), - } - if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil { - return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err) - } - logrus.Info("Manifest object uploaded successfully.") - return nil + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) } - - return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + logrus.Info("All chunk uploaded successfully. Uploading manifest.") + b := make([]byte, 0) + opts := objects.CreateOpts{ + Content: bytes.NewReader(b), + ContentLength: chunkSize, + ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName), + } + if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil { + return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err) + } + logrus.Info("Manifest object uploaded successfully.") + return nil } func (s *SwiftSnapStore) uploadChunk(snap *Snapshot, file *os.File, offset, chunkSize int64) error { @@ -182,15 +181,14 @@ func (s *SwiftSnapStore) chunkUploader(wg *sync.WaitGroup, stopCh <-chan struct{ select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) err := s.uploadChunk(snap, file, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) errCh <- chunkUploadResult{ - err: nil, + err: err, chunk: &chunk, } } diff --git a/pkg/snapstore/types.go b/pkg/snapstore/types.go index e834d1efe..d874c85c0 100644 --- a/pkg/snapstore/types.go +++ b/pkg/snapstore/types.go @@ -86,7 +86,7 @@ type Config struct { Prefix string // MaxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. MaxParallelChunkUploads int - // Temporay Directory + // Temporary Directory TempDir string } diff --git a/pkg/snapstore/utils.go b/pkg/snapstore/utils.go index fea2b6155..092eae14d 100644 --- a/pkg/snapstore/utils.go +++ b/pkg/snapstore/utils.go @@ -86,15 +86,16 @@ func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploa for chunkRes := range resCh { logrus.Infof("Received chunk result for id: %d, offset: %d", chunkRes.chunk.id, chunkRes.chunk.offset) if chunkRes.err != nil { + logrus.Infof("Chunk upload failed for id: %d, offset: %d with err: %v", chunkRes.chunk.id, chunkRes.chunk.offset, chunkRes.err) if chunkRes.chunk.attempt == maxRetryAttempts { - logrus.Errorf("Received the chunk upload error from one of the workers. Sending stop signal to all workers.") + logrus.Errorf("Received the chunk upload error even after %d attempts from one of the workers. Sending stop signal to all workers.", chunkRes.chunk.attempt) close(stopCh) return &chunkRes } chunk := chunkRes.chunk delayTime := (1 << chunk.attempt) chunk.attempt++ - logrus.Warnf("Will try to upload chunk id:%d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime) + logrus.Warnf("Will try to upload chunk id: %d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime) time.AfterFunc(time.Duration(delayTime)*time.Second, func() { select { case <-stopCh: @@ -103,11 +104,13 @@ func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploa chunkUploadCh <- *chunk } }) - } - remainingChunks-- - if remainingChunks == 0 { - close(stopCh) - break + } else { + remainingChunks-- + if remainingChunks == 0 { + logrus.Infof("Received successful chunk result for all chunks. Stopping workers.") + close(stopCh) + break + } } } return nil From 19c395fc713133dde1a9a03f1d18c0632cf516e2 Mon Sep 17 00:00:00 2001 From: Swapnil Mhamane Date: Wed, 28 Nov 2018 20:21:43 +0530 Subject: [PATCH 3/3] Fix Garbage collection integration test Signed-off-by: Swapnil Mhamane --- test/e2e/integration/cloud_backup_test.go | 35 +++++++++++------------ 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/test/e2e/integration/cloud_backup_test.go b/test/e2e/integration/cloud_backup_test.go index 2457bdfc2..e6e05fbe3 100644 --- a/test/e2e/integration/cloud_backup_test.go +++ b/test/e2e/integration/cloud_backup_test.go @@ -2,7 +2,6 @@ package integration_test import ( "bufio" - "fmt" "io/ioutil" "net/http" "os" @@ -113,17 +112,6 @@ var _ = Describe("CloudBackup", func() { ) BeforeEach(func() { - cmdEtcd, etcdErrChan = startEtcd() - cmdEtcdbrctl, etcdbrErrChan = startSnapshotter() - go func() { - select { - case err := <-*etcdErrChan: - Expect(err).ShouldNot(HaveOccurred()) - case err := <-*etcdbrErrChan: - Expect(err).ShouldNot(HaveOccurred()) - } - }() - snapstoreConfig := &snapstore.Config{ Provider: "S3", Container: os.Getenv("TEST_ID"), @@ -138,6 +126,17 @@ var _ = Describe("CloudBackup", func() { store.Delete(*snap) } + cmdEtcd, etcdErrChan = startEtcd() + cmdEtcdbrctl, etcdbrErrChan = startSnapshotter() + go func() { + select { + case err := <-*etcdErrChan: + Expect(err).ShouldNot(HaveOccurred()) + case err := <-*etcdbrErrChan: + Expect(err).ShouldNot(HaveOccurred()) + } + }() + }) Context("taken at 1 minute interval", func() { @@ -146,7 +145,7 @@ var _ = Describe("CloudBackup", func() { Expect(snaplist).Should(BeEmpty()) Expect(err).ShouldNot(HaveOccurred()) - time.Sleep(1 * time.Minute) + time.Sleep(70 * time.Second) snaplist, err = store.List() Expect(snaplist).ShouldNot(BeEmpty()) @@ -160,20 +159,20 @@ var _ = Describe("CloudBackup", func() { Expect(snaplist).Should(BeEmpty()) Expect(err).ShouldNot(HaveOccurred()) - time.Sleep(160 * time.Second) + time.Sleep(190 * time.Second) snaplist, err = store.List() Expect(err).ShouldNot(HaveOccurred()) count := 0 - expectedCount := 1 for _, snap := range snaplist { if snap.Kind == snapstore.SnapshotKindFull { count++ } } - if count != expectedCount { - Fail(fmt.Sprintf("number of full snapshots found does not match expected count: %d", expectedCount)) - } + // We don't have control over whether for the lifecycle of test, whether GC ran after + // last snapshot of not. But we are sure that if GC is working, there will be + // max 2 snapshots otherwise 3 full snapshot will be there for 190 second period. + Expect(count).Should(BeNumerically("<=", 2)) }) })