Skip to content

Commit

Permalink
Refactor upload to GCS
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf authored and roboquat committed May 12, 2022
1 parent d9ca332 commit 8c50daf
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 225 deletions.
1 change: 0 additions & 1 deletion components/content-service-api/go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ type GCPConfig struct {
CredentialsFile string `json:"credentialsFile"`
Region string `json:"region"`
Project string `json:"projectId"`
ParallelUpload int `json:"parallelUpload"`

MaximumBackupCount int `json:"maximumBackupCount"`
}
Expand Down
7 changes: 4 additions & 3 deletions components/content-service/pkg/archive/tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,17 @@ func ExtractTarbal(ctx context.Context, src io.Reader, dst string, opts ...TarOp
// We need to remap the UID and GID between the host and the container to avoid permission issues.
for _, p := range paths {
v := m[p]
uid := toHostID(v.UID, cfg.UIDMaps)
gid := toHostID(v.GID, cfg.GIDMaps)

if v.IsSymlink {
continue
}

uid := toHostID(v.UID, cfg.UIDMaps)
gid := toHostID(v.GID, cfg.GIDMaps)

err = remapFile(path.Join(dst, p), uid, gid, v.Xattrs)
if err != nil {
log.WithError(err).WithField("uid", uid).WithField("gid", gid).WithField("path", p).Warn("cannot chown")
log.WithError(err).WithField("uid", uid).WithField("gid", gid).WithField("path", p).Debug("cannot chown")
}
}

Expand Down
265 changes: 48 additions & 217 deletions components/content-service/pkg/storage/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"hash/crc32"
"io"
"math/rand"
"net/http"
"os"
"path/filepath"
Expand All @@ -20,9 +18,9 @@ import (
"sync"
"time"

"cloud.google.com/go/storage"
gcpstorage "cloud.google.com/go/storage"
validation "github.com/go-ozzo/ozzo-validation"
"github.com/opentracing/opentracing-go"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
Expand All @@ -33,7 +31,6 @@ import (
"github.com/gitpod-io/gitpod/common-go/tracing"
config "github.com/gitpod-io/gitpod/content-service/api/config"
"github.com/gitpod-io/gitpod/content-service/pkg/archive"
"github.com/opentracing/opentracing-go"
)

var _ DirectAccess = &DirectGCPStorage{}
Expand Down Expand Up @@ -174,8 +171,8 @@ func (rs *DirectGCPStorage) defaultObjectAccess(ctx context.Context, bkt, obj st
return nil, false, xerrors.Errorf("no gcloud client available - did you call Init()?")
}

hdl := rs.client.Bucket(bkt).Object(obj)
rc, err := hdl.NewReader(ctx)
objHandle := rs.client.Bucket(bkt).Object(obj)
rc, err := objHandle.NewReader(ctx)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -346,50 +343,64 @@ func (rs *DirectGCPStorage) Upload(ctx context.Context, source string, name stri
}
defer sfn.Close()

var totalSize int64
log.WithField("tasks", fmt.Sprintf("%d", rs.GCPConfig.ParallelUpload)).WithField("tmpfile", source).Debug("Uploading in parallel")
stat, err := sfn.Stat()
if err != nil {
return
}
totalSize = stat.Size()
span.SetTag("totalSize", totalSize)

uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
uploadSpan.SetTag("bucket", rs.bucketName())
uploadSpan.SetTag("obj", rs.objectName(name))
/* Read back from the file in chunks. We don't wand a complicated composition operation,
* so we'll have 32 chunks max. See https://cloud.google.com/storage/docs/composite-objects
* for more details.
*/
var chunks []string
if chunks, err = rs.uploadChunks(opentracing.ContextWithSpan(ctx, uploadSpan), sfn, totalSize, rs.GCPConfig.ParallelUpload); err != nil {
tracing.FinishSpan(uploadSpan, &err)
return
}
defer func() {
err := rs.deleteChunks(opentracing.ContextWithSpan(ctx, uploadSpan), chunks)
if err != nil {
log.WithError(err).WithField("name", name).Warn("cannot clean up upload chunks")
}
}()

log.WithField("workspaceId", rs.WorkspaceName).WithField("bucketName", rs.bucketName()).Debug("Uploaded chunks")
totalSize := stat.Size()
span.SetTag("totalSize", totalSize)

// compose the uploaded chunks
bucket = rs.bucketName()
bkt := rs.client.Bucket(bucket)
src := make([]*gcpstorage.ObjectHandle, len(chunks))
for i := 0; i < len(chunks); i++ {
src[i] = bkt.Object(chunks[i])
}

object = rs.objectName(name)
obj := bkt.Object(object)

uploadSpan := opentracing.StartSpan("remote-upload", opentracing.ChildOf(span.Context()))
uploadSpan.SetTag("bucket", bucket)
uploadSpan.SetTag("obj", object)

var firstBackup bool
if _, e := obj.Attrs(ctx); e == gcpstorage.ErrObjectNotExist {
firstBackup = true
}

var wg sync.WaitGroup
var written int64

wg.Add(1)

go func() {
defer wg.Done()

wc := obj.NewWriter(ctx)
wc.Metadata = options.Annotations
wc.ContentType = options.ContentType
// Increase chunk size for faster uploading
wc.ChunkSize = googleapi.DefaultUploadChunkSize * 4

written, err = io.Copy(wc, sfn)
if err != nil {
log.WithError(err).WithField("name", name).Error("Error while uploading file")
return
}

// persist changes in GCS
err = wc.Close()
if err != nil {
log.WithError(err).WithField("name", name).Error("Error while uploading file")
return
}
}()

wg.Wait()

if written != totalSize {
err = xerrors.Errorf("Wrote fewer bytes than it should have, %d instead of %d", written, totalSize)
return
}

// maintain backup trail if we're asked to - we do this prior to overwriting the regular backup file
// to make sure we're trailign the previous backup.
if options.BackupTrail.Enabled && !firstBackup {
Expand All @@ -399,51 +410,8 @@ func (rs *DirectGCPStorage) Upload(ctx context.Context, source string, name stri
}
}

// now that the upload is complete and the backup trail has been created, compose the chunks to
// create the actual backup
_, err = obj.ComposerFrom(src...).Run(ctx)
if err != nil {
tracing.FinishSpan(uploadSpan, &err)
return
}
attrs, err := obj.Update(ctx, gcpstorage.ObjectAttrsToUpdate{
ContentType: options.ContentType,
Metadata: options.Annotations,
})
if err != nil {
tracing.FinishSpan(uploadSpan, &err)
return
}
log.WithField("chunkCount", fmt.Sprintf("%d", len(chunks))).Debug("Composited chunks")
uploadSpan.Finish()

// compare the MD5 sum of the composited object with the local tar file
remotehash := attrs.CRC32C
_, err = sfn.Seek(0, 0)
if err != nil {
log.WithError(err).Debug("cannot compute local checksum")

// us being unable to produce the local checksum is not enough of a reason to fail the upload
// altogether. We did upload something after all.
err = nil
return
}
var localhash uint32
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
_, err = io.Copy(h, sfn)
if err != nil {
log.WithError(err).Debug("cannot compute local checksum")
} else {
localhash = h.Sum32()
}
if remotehash == 0 || localhash == 0 {
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("one of the checksums is empty - not comparing")
} else if remotehash == localhash {
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("checksums match")
} else {
log.WithField("remotehash", remotehash).WithField("localhash", localhash).Debug("checksums do not match")
}

err = nil
return
}
Expand Down Expand Up @@ -480,133 +448,6 @@ func (rs *DirectGCPStorage) ensureBackupSlotAvailable() error {
return nil
}

func (rs *DirectGCPStorage) uploadChunks(ctx context.Context, f io.ReaderAt, totalSize int64, desiredChunkCount int) (chnks []string, err error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunks")
defer tracing.FinishSpan(span, &err)

if totalSize == 0 {
return []string{}, xerrors.Errorf("Total size must be greater than zero")
}
if desiredChunkCount < 1 {
return []string{}, xerrors.Errorf("Desired chunk count must be greater (or equal to) one")
}

minChunkSize := int64(256 * 1024)
chunkSize := totalSize / int64(desiredChunkCount)
chunkSize = (chunkSize / minChunkSize) * minChunkSize
if chunkSize < minChunkSize {
chunkSize = minChunkSize
}
chunkCount := int(totalSize/chunkSize) + 1

log.WithField("count", chunkCount).WithField("chunkSize", chunkSize).WithField("totalSize", totalSize).Debug("Computed chunk size")

pfx := fmt.Sprintf("uploads/%s", randomString(20))

// sync construct taken from https://play.golang.org/p/mqUvKFDQbfn
errChannel := make(chan error, 1)
chunks := make([]string, chunkCount)
wg := sync.WaitGroup{}

// we need to add ourselves to the working group here, and not in the
// go routines, as they might start after the "finished" go routine.
wg.Add(chunkCount)

for i := 0; i < chunkCount; i++ {
off := int64(i) * chunkSize
n := chunkSize
if off+n > totalSize {
n = totalSize - off
}
r := io.NewSectionReader(f, off, n)
chunkName := fmt.Sprintf("%s/%d-upload", pfx, i)
chunks[i] = chunkName

go rs.uploadChunk(opentracing.ContextWithSpan(ctx, span), chunkName, r, n, &wg, errChannel)
}

// Put the wait group in a go routine.
// By putting the wait group in the go routine we ensure either all pass
// and we close the "finished" channel or we wait forever for the wait group
// to finish.
//
// Waiting forever is okay because of the blocking select below.
finished := make(chan bool, 1)
go func() {
wg.Wait()
close(finished)
}()

// This select will block until one of the two channels returns a value.
// This means on the first failure in the go routines above the errChannel will release a
// value first. Because there is a "return" statement in the err check this function will
// exit when an error occurs.
//
// Due to the blocking on wg.Wait() the finished channel will not get a value unless all
// the go routines before were successful because not all the wg.Done() calls would have
// happened.
select {
case <-finished:
log.Debug("Finished uploading")
case err := <-errChannel:
log.WithError(err).Debug("Error while uploading chunks")
if err != nil {
// already logged in uploadChunk
return []string{}, err
}
}

return chunks, nil
}

func (rs *DirectGCPStorage) uploadChunk(ctx context.Context, name string, r io.Reader, size int64, wg *sync.WaitGroup, errchan chan error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunk")
span.SetTag("size", size)
defer span.Finish()

defer wg.Done()

start := time.Now()
log.WithField("name", name).WithField("size", fmt.Sprintf("%d", size)).Debug("Uploading chunk")

wc := rs.client.Bucket(rs.bucketName()).Object(name).NewWriter(ctx)
defer wc.Close()

written, err := io.Copy(wc, r)
if err != nil {
log.WithError(err).WithField("name", name).Error("Error while uploading chunk")
errchan <- err
return
}
if written != size {
err := xerrors.Errorf("Wrote fewer bytes than it should have, %d instead of %d", written, size)
log.WithError(err).WithField("name", name).Error("Error while uploading chunk")
errchan <- err
return
}

log.WithField("name", name).WithField("duration", time.Since(start)).Debug("Upload complete")
}

func (rs *DirectGCPStorage) deleteChunks(ctx context.Context, chunks []string) (err error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "deleteChunks")
defer tracing.FinishSpan(span, &err)

for i := 0; i < len(chunks); i++ {
err = rs.client.Bucket(rs.bucketName()).Object(chunks[i]).Delete(ctx)
}

if err != nil {
log.WithError(err).Error("Error while deleting chunks")
return err
}

return nil
}

func (rs *DirectGCPStorage) trailBackup(ctx context.Context, bkt *gcpstorage.BucketHandle, obj *gcpstorage.ObjectHandle, backupID string, trailLength int) (err error) {
//nolint:ineffassign
span, ctx := opentracing.StartSpanFromContext(ctx, "uploadChunk")
Expand Down Expand Up @@ -653,16 +494,6 @@ func (rs *DirectGCPStorage) trailBackup(ctx context.Context, bkt *gcpstorage.Buc
return nil
}

func randomString(len int) string {
min := 97
max := 122
bytes := make([]byte, len)
for i := 0; i < len; i++ {
bytes[i] = byte(min + rand.Intn(max-min))
}
return string(bytes)
}

func (rs *DirectGCPStorage) bucketName() string {
return gcpBucketName(rs.Stage, rs.Username)
}
Expand Down Expand Up @@ -803,7 +634,7 @@ func (p *PresignedGCPStorage) DiskUsage(ctx context.Context, bucket string, pref
}

var total int64
it := client.Bucket(bucket).Objects(ctx, &storage.Query{
it := client.Bucket(bucket).Objects(ctx, &gcpstorage.Query{
Prefix: prefix,
})
for {
Expand Down Expand Up @@ -939,7 +770,7 @@ func (p *PresignedGCPStorage) DeleteObject(ctx context.Context, bucket string, q
b := client.Bucket(bucket)
var it *gcpstorage.ObjectIterator
if prefix != "" && prefix != "/" {
it = b.Objects(ctx, &storage.Query{
it = b.Objects(ctx, &gcpstorage.Query{
Prefix: prefix,
})
} else {
Expand Down
Loading

0 comments on commit 8c50daf

Please sign in to comment.