From ff934a810a7838ceb958e01a90814d9513363e74 Mon Sep 17 00:00:00 2001 From: Swapnil Mhamane Date: Tue, 27 Nov 2018 14:05:25 +0530 Subject: [PATCH] Fix the error handling for chunk on azure, swift Signed-off-by: Swapnil Mhamane --- pkg/snapstore/abs_snapstore.go | 43 +++++++++++++++---------------- pkg/snapstore/gcs_snapstore.go | 44 +++++++++++++++----------------- pkg/snapstore/s3_snapstore.go | 32 ++++++++++++----------- pkg/snapstore/swift_snapstore.go | 36 ++++++++++++-------------- pkg/snapstore/types.go | 2 +- pkg/snapstore/utils.go | 17 +++++++----- 6 files changed, 86 insertions(+), 88 deletions(-) diff --git a/pkg/snapstore/abs_snapstore.go b/pkg/snapstore/abs_snapstore.go index 0596651a6..19d182956 100644 --- a/pkg/snapstore/abs_snapstore.go +++ b/pkg/snapstore/abs_snapstore.go @@ -165,26 +165,25 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error { snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) wg.Wait() - if snapshotErr == nil { - logrus.Info("All chunk uploaded successfully. Uploading blocklist.") - blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName) - blob := a.container.GetBlobReference(blobName) - var blockList []storage.Block - for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { - block := storage.Block{ - ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))), - Status: storage.BlockStatusUncommitted, - } - blockList = append(blockList, block) - } - if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil { - return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err) + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + } + logrus.Info("All chunk uploaded successfully. Uploading blocklist.") + blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName) + blob := a.container.GetBlobReference(blobName) + var blockList []storage.Block + for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { + block := storage.Block{ + ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))), + Status: storage.BlockStatusUncommitted, } - logrus.Info("Blocklist uploaded successfully.") - return nil + blockList = append(blockList, block) } - - return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil { + return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err) + } + logrus.Info("Blocklist uploaded successfully.") + return nil } func (a *ABSSnapStore) uploadBlock(snap *Snapshot, file *os.File, offset, chunkSize int64) error { @@ -213,18 +212,16 @@ func (a *ABSSnapStore) blockUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) err := a.uploadBlock(snap, file, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) errCh <- chunkUploadResult{ - err: nil, + err: err, chunk: &chunk, } - } } } diff --git a/pkg/snapstore/gcs_snapstore.go b/pkg/snapstore/gcs_snapstore.go index 913d22b43..19d7f1765 100644 --- a/pkg/snapstore/gcs_snapstore.go +++ b/pkg/snapstore/gcs_snapstore.go @@ -122,28 +122,27 @@ func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error { snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) wg.Wait() - if snapshotErr == nil { - logrus.Info("All chunk uploaded successfully. Uploading composite object.") - bh := s.client.Bucket(s.bucket) - var subObjects []*storage.ObjectHandle - for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { - name := path.Join(s.prefix, snap.SnapDir, snap.SnapName, fmt.Sprintf("%010d", partNumber)) - obj := bh.Object(name) - subObjects = append(subObjects, obj) - } - name := path.Join(s.prefix, snap.SnapDir, snap.SnapName) + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + } + logrus.Info("All chunk uploaded successfully. Uploading composite object.") + bh := s.client.Bucket(s.bucket) + var subObjects []*storage.ObjectHandle + for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ { + name := path.Join(s.prefix, snap.SnapDir, snap.SnapName, fmt.Sprintf("%010d", partNumber)) obj := bh.Object(name) - c := obj.ComposerFrom(subObjects...) - ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout) - defer cancel() - if _, err := c.Run(ctx); err != nil { - return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err) - } - logrus.Info("Composite object uploaded successfully.") - return nil + subObjects = append(subObjects, obj) } - - return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + name := path.Join(s.prefix, snap.SnapDir, snap.SnapName) + obj := bh.Object(name) + c := obj.ComposerFrom(subObjects...) + ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout) + defer cancel() + if _, err := c.Run(ctx); err != nil { + return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err) + } + logrus.Info("Composite object uploaded successfully.") + return nil } func (s *GCSSnapStore) uploadComponent(snap *Snapshot, file *os.File, offset, chunkSize int64) error { @@ -177,13 +176,12 @@ func (s *GCSSnapStore) componentUploader(wg *sync.WaitGroup, stopCh <-chan struc select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) err := s.uploadComponent(snap, file, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) errCh <- chunkUploadResult{ err: err, chunk: &chunk, diff --git a/pkg/snapstore/s3_snapstore.go b/pkg/snapstore/s3_snapstore.go index 26bcd0c94..28dc84063 100644 --- a/pkg/snapstore/s3_snapstore.go +++ b/pkg/snapstore/s3_snapstore.go @@ -230,13 +230,12 @@ func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, s select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with id: %d, offset: %d, attempt: %d", chunk.id, chunk.offset, chunk.attempt) err := s.uploadPart(snap, file, uploadID, completedParts, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of id: %d, offset: %d, error: %v", chunk.id, chunk.offset, err) errCh <- chunkUploadResult{ err: err, chunk: &chunk, @@ -247,25 +246,28 @@ func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, s // List will list the snapshots from store func (s *S3SnapStore) List() (SnapList, error) { - resp, err := s.client.ListObjects(&s3.ListObjectsInput{ + var snapList SnapList + in := &s3.ListObjectsInput{ Bucket: aws.String(s.bucket), Prefix: aws.String(fmt.Sprintf("%s/", s.prefix)), + } + err := s.client.ListObjectsPages(in, func(page *s3.ListObjectsOutput, lastPage bool) bool { + for _, key := range page.Contents { + k := (*key.Key)[len(*page.Prefix):] + snap, err := ParseSnapshot(k) + if err != nil { + // Warning + logrus.Warnf("Invalid snapshot found. Ignoring it: %s", k) + } else { + snapList = append(snapList, snap) + } + } + return !lastPage }) if err != nil { return nil, err } - var snapList SnapList - for _, key := range resp.Contents { - k := (*key.Key)[len(*resp.Prefix):] - snap, err := ParseSnapshot(k) - if err != nil { - // Warning - logrus.Warnf("Invalid snapshot found. Ignoring it:%s\n", k) - } else { - snapList = append(snapList, snap) - } - } sort.Sort(snapList) return snapList, nil } diff --git a/pkg/snapstore/swift_snapstore.go b/pkg/snapstore/swift_snapstore.go index e5379a4e0..a1a4110e9 100644 --- a/pkg/snapstore/swift_snapstore.go +++ b/pkg/snapstore/swift_snapstore.go @@ -137,22 +137,21 @@ func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error { snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks) wg.Wait() - if snapshotErr == nil { - logrus.Info("All chunk uploaded successfully. Uploading manifest.") - b := make([]byte, 0) - opts := objects.CreateOpts{ - Content: bytes.NewReader(b), - ContentLength: chunkSize, - ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName), - } - if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil { - return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err) - } - logrus.Info("Manifest object uploaded successfully.") - return nil + if snapshotErr != nil { + return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) } - - return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err) + logrus.Info("All chunk uploaded successfully. Uploading manifest.") + b := make([]byte, 0) + opts := objects.CreateOpts{ + Content: bytes.NewReader(b), + ContentLength: chunkSize, + ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName), + } + if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil { + return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err) + } + logrus.Info("Manifest object uploaded successfully.") + return nil } func (s *SwiftSnapStore) uploadChunk(snap *Snapshot, file *os.File, offset, chunkSize int64) error { @@ -182,15 +181,14 @@ func (s *SwiftSnapStore) chunkUploader(wg *sync.WaitGroup, stopCh <-chan struct{ select { case <-stopCh: return - case chunk, more := <-chunkUploadCh: - if !more { + case chunk, ok := <-chunkUploadCh: + if !ok { return } logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt) err := s.uploadChunk(snap, file, chunk.offset, chunk.size) - logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err) errCh <- chunkUploadResult{ - err: nil, + err: err, chunk: &chunk, } } diff --git a/pkg/snapstore/types.go b/pkg/snapstore/types.go index e834d1efe..d874c85c0 100644 --- a/pkg/snapstore/types.go +++ b/pkg/snapstore/types.go @@ -86,7 +86,7 @@ type Config struct { Prefix string // MaxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed. MaxParallelChunkUploads int - // Temporay Directory + // Temporary Directory TempDir string } diff --git a/pkg/snapstore/utils.go b/pkg/snapstore/utils.go index fea2b6155..092eae14d 100644 --- a/pkg/snapstore/utils.go +++ b/pkg/snapstore/utils.go @@ -86,15 +86,16 @@ func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploa for chunkRes := range resCh { logrus.Infof("Received chunk result for id: %d, offset: %d", chunkRes.chunk.id, chunkRes.chunk.offset) if chunkRes.err != nil { + logrus.Infof("Chunk upload failed for id: %d, offset: %d with err: %v", chunkRes.chunk.id, chunkRes.chunk.offset, chunkRes.err) if chunkRes.chunk.attempt == maxRetryAttempts { - logrus.Errorf("Received the chunk upload error from one of the workers. Sending stop signal to all workers.") + logrus.Errorf("Received the chunk upload error even after %d attempts from one of the workers. Sending stop signal to all workers.", chunkRes.chunk.attempt) close(stopCh) return &chunkRes } chunk := chunkRes.chunk delayTime := (1 << chunk.attempt) chunk.attempt++ - logrus.Warnf("Will try to upload chunk id:%d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime) + logrus.Warnf("Will try to upload chunk id: %d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime) time.AfterFunc(time.Duration(delayTime)*time.Second, func() { select { case <-stopCh: @@ -103,11 +104,13 @@ func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploa chunkUploadCh <- *chunk } }) - } - remainingChunks-- - if remainingChunks == 0 { - close(stopCh) - break + } else { + remainingChunks-- + if remainingChunks == 0 { + logrus.Infof("Received successful chunk result for all chunks. Stopping workers.") + close(stopCh) + break + } } } return nil