Skip to content

Commit

Permalink
Fix the error handling for chunk on azure, swift
Browse files Browse the repository at this point in the history
Signed-off-by: Swapnil Mhamane <[email protected]>
  • Loading branch information
Swapnil Mhamane committed Nov 28, 2018
1 parent 40558fa commit ff934a8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 88 deletions.
43 changes: 20 additions & 23 deletions pkg/snapstore/abs_snapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,26 +165,25 @@ func (a *ABSSnapStore) Save(snap Snapshot, r io.Reader) error {
snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks)
wg.Wait()

if snapshotErr == nil {
logrus.Info("All chunk uploaded successfully. Uploading blocklist.")
blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName)
blob := a.container.GetBlobReference(blobName)
var blockList []storage.Block
for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ {
block := storage.Block{
ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))),
Status: storage.BlockStatusUncommitted,
}
blockList = append(blockList, block)
}
if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil {
return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err)
if snapshotErr != nil {
return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
}
logrus.Info("All chunk uploaded successfully. Uploading blocklist.")
blobName := path.Join(a.prefix, snap.SnapDir, snap.SnapName)
blob := a.container.GetBlobReference(blobName)
var blockList []storage.Block
for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ {
block := storage.Block{
ID: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%010d", partNumber))),
Status: storage.BlockStatusUncommitted,
}
logrus.Info("Blocklist uploaded successfully.")
return nil
blockList = append(blockList, block)
}

return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
if err := blob.PutBlockList(blockList, &storage.PutBlockListOptions{}); err != nil {
return fmt.Errorf("failed uploading blocklist for snapshot with error: %v", err)
}
logrus.Info("Blocklist uploaded successfully.")
return nil
}

func (a *ABSSnapStore) uploadBlock(snap *Snapshot, file *os.File, offset, chunkSize int64) error {
Expand Down Expand Up @@ -213,18 +212,16 @@ func (a *ABSSnapStore) blockUploader(wg *sync.WaitGroup, stopCh <-chan struct{},
select {
case <-stopCh:
return
case chunk, more := <-chunkUploadCh:
if !more {
case chunk, ok := <-chunkUploadCh:
if !ok {
return
}
logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt)
err := a.uploadBlock(snap, file, chunk.offset, chunk.size)
logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err)
errCh <- chunkUploadResult{
err: nil,
err: err,
chunk: &chunk,
}

}
}
}
Expand Down
44 changes: 21 additions & 23 deletions pkg/snapstore/gcs_snapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,28 +122,27 @@ func (s *GCSSnapStore) Save(snap Snapshot, r io.Reader) error {
snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks)
wg.Wait()

if snapshotErr == nil {
logrus.Info("All chunk uploaded successfully. Uploading composite object.")
bh := s.client.Bucket(s.bucket)
var subObjects []*storage.ObjectHandle
for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ {
name := path.Join(s.prefix, snap.SnapDir, snap.SnapName, fmt.Sprintf("%010d", partNumber))
obj := bh.Object(name)
subObjects = append(subObjects, obj)
}
name := path.Join(s.prefix, snap.SnapDir, snap.SnapName)
if snapshotErr != nil {
return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
}
logrus.Info("All chunk uploaded successfully. Uploading composite object.")
bh := s.client.Bucket(s.bucket)
var subObjects []*storage.ObjectHandle
for partNumber := int64(1); partNumber <= noOfChunks; partNumber++ {
name := path.Join(s.prefix, snap.SnapDir, snap.SnapName, fmt.Sprintf("%010d", partNumber))
obj := bh.Object(name)
c := obj.ComposerFrom(subObjects...)
ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout)
defer cancel()
if _, err := c.Run(ctx); err != nil {
return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err)
}
logrus.Info("Composite object uploaded successfully.")
return nil
subObjects = append(subObjects, obj)
}

return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
name := path.Join(s.prefix, snap.SnapDir, snap.SnapName)
obj := bh.Object(name)
c := obj.ComposerFrom(subObjects...)
ctx, cancel := context.WithTimeout(context.TODO(), chunkUploadTimeout)
defer cancel()
if _, err := c.Run(ctx); err != nil {
return fmt.Errorf("failed uploading composite object for snapshot with error: %v", err)
}
logrus.Info("Composite object uploaded successfully.")
return nil
}

func (s *GCSSnapStore) uploadComponent(snap *Snapshot, file *os.File, offset, chunkSize int64) error {
Expand Down Expand Up @@ -177,13 +176,12 @@ func (s *GCSSnapStore) componentUploader(wg *sync.WaitGroup, stopCh <-chan struc
select {
case <-stopCh:
return
case chunk, more := <-chunkUploadCh:
if !more {
case chunk, ok := <-chunkUploadCh:
if !ok {
return
}
logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt)
err := s.uploadComponent(snap, file, chunk.offset, chunk.size)
logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err)
errCh <- chunkUploadResult{
err: err,
chunk: &chunk,
Expand Down
32 changes: 17 additions & 15 deletions pkg/snapstore/s3_snapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,12 @@ func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, s
select {
case <-stopCh:
return
case chunk, more := <-chunkUploadCh:
if !more {
case chunk, ok := <-chunkUploadCh:
if !ok {
return
}
logrus.Infof("Uploading chunk with id: %d, offset: %d, attempt: %d", chunk.id, chunk.offset, chunk.attempt)
err := s.uploadPart(snap, file, uploadID, completedParts, chunk.offset, chunk.size)
logrus.Infof("For chunk upload of id: %d, offset: %d, error: %v", chunk.id, chunk.offset, err)
errCh <- chunkUploadResult{
err: err,
chunk: &chunk,
Expand All @@ -247,25 +246,28 @@ func (s *S3SnapStore) partUploader(wg *sync.WaitGroup, stopCh <-chan struct{}, s

// List will list the snapshots from store
func (s *S3SnapStore) List() (SnapList, error) {
resp, err := s.client.ListObjects(&s3.ListObjectsInput{
var snapList SnapList
in := &s3.ListObjectsInput{
Bucket: aws.String(s.bucket),
Prefix: aws.String(fmt.Sprintf("%s/", s.prefix)),
}
err := s.client.ListObjectsPages(in, func(page *s3.ListObjectsOutput, lastPage bool) bool {
for _, key := range page.Contents {
k := (*key.Key)[len(*page.Prefix):]
snap, err := ParseSnapshot(k)
if err != nil {
// Warning
logrus.Warnf("Invalid snapshot found. Ignoring it: %s", k)
} else {
snapList = append(snapList, snap)
}
}
return !lastPage
})
if err != nil {
return nil, err
}

var snapList SnapList
for _, key := range resp.Contents {
k := (*key.Key)[len(*resp.Prefix):]
snap, err := ParseSnapshot(k)
if err != nil {
// Warning
logrus.Warnf("Invalid snapshot found. Ignoring it:%s\n", k)
} else {
snapList = append(snapList, snap)
}
}
sort.Sort(snapList)
return snapList, nil
}
Expand Down
36 changes: 17 additions & 19 deletions pkg/snapstore/swift_snapstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,21 @@ func (s *SwiftSnapStore) Save(snap Snapshot, r io.Reader) error {
snapshotErr := collectChunkUploadError(chunkUploadCh, resCh, cancelCh, noOfChunks)
wg.Wait()

if snapshotErr == nil {
logrus.Info("All chunk uploaded successfully. Uploading manifest.")
b := make([]byte, 0)
opts := objects.CreateOpts{
Content: bytes.NewReader(b),
ContentLength: chunkSize,
ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName),
}
if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil {
return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err)
}
logrus.Info("Manifest object uploaded successfully.")
return nil
if snapshotErr != nil {
return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
}

return fmt.Errorf("failed uploading chunk, id: %d, offset: %d, error: %v", snapshotErr.chunk.id, snapshotErr.chunk.offset, snapshotErr.err)
logrus.Info("All chunk uploaded successfully. Uploading manifest.")
b := make([]byte, 0)
opts := objects.CreateOpts{
Content: bytes.NewReader(b),
ContentLength: chunkSize,
ObjectManifest: path.Join(s.bucket, s.prefix, snap.SnapDir, snap.SnapName),
}
if res := objects.Create(s.client, s.bucket, path.Join(s.prefix, snap.SnapDir, snap.SnapName), opts); res.Err != nil {
return fmt.Errorf("failed uploading manifest for snapshot with error: %v", res.Err)
}
logrus.Info("Manifest object uploaded successfully.")
return nil
}

func (s *SwiftSnapStore) uploadChunk(snap *Snapshot, file *os.File, offset, chunkSize int64) error {
Expand Down Expand Up @@ -182,15 +181,14 @@ func (s *SwiftSnapStore) chunkUploader(wg *sync.WaitGroup, stopCh <-chan struct{
select {
case <-stopCh:
return
case chunk, more := <-chunkUploadCh:
if !more {
case chunk, ok := <-chunkUploadCh:
if !ok {
return
}
logrus.Infof("Uploading chunk with offset : %d, attempt: %d", chunk.offset, chunk.attempt)
err := s.uploadChunk(snap, file, chunk.offset, chunk.size)
logrus.Infof("For chunk upload of offset %d, err %v", chunk.offset, err)
errCh <- chunkUploadResult{
err: nil,
err: err,
chunk: &chunk,
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/snapstore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Config struct {
Prefix string
// MaxParallelChunkUploads hold the maximum number of parallel chunk uploads allowed.
MaxParallelChunkUploads int
// Temporay Directory
// Temporary Directory
TempDir string
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/snapstore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploa
for chunkRes := range resCh {
logrus.Infof("Received chunk result for id: %d, offset: %d", chunkRes.chunk.id, chunkRes.chunk.offset)
if chunkRes.err != nil {
logrus.Infof("Chunk upload failed for id: %d, offset: %d with err: %v", chunkRes.chunk.id, chunkRes.chunk.offset, chunkRes.err)
if chunkRes.chunk.attempt == maxRetryAttempts {
logrus.Errorf("Received the chunk upload error from one of the workers. Sending stop signal to all workers.")
logrus.Errorf("Received the chunk upload error even after %d attempts from one of the workers. Sending stop signal to all workers.", chunkRes.chunk.attempt)
close(stopCh)
return &chunkRes
}
chunk := chunkRes.chunk
delayTime := (1 << chunk.attempt)
chunk.attempt++
logrus.Warnf("Will try to upload chunk id:%d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime)
logrus.Warnf("Will try to upload chunk id: %d, offset: %d at attempt %d after %d seconds", chunk.id, chunk.offset, chunk.attempt, delayTime)
time.AfterFunc(time.Duration(delayTime)*time.Second, func() {
select {
case <-stopCh:
Expand All @@ -103,11 +104,13 @@ func collectChunkUploadError(chunkUploadCh chan<- chunk, resCh <-chan chunkUploa
chunkUploadCh <- *chunk
}
})
}
remainingChunks--
if remainingChunks == 0 {
close(stopCh)
break
} else {
remainingChunks--
if remainingChunks == 0 {
logrus.Infof("Received successful chunk result for all chunks. Stopping workers.")
close(stopCh)
break
}
}
}
return nil
Expand Down

0 comments on commit ff934a8

Please sign in to comment.