Skip to content

Commit

Permalink
Fix #4885: Use multipart upload instead of CopyObject for touching fi…
Browse files Browse the repository at this point in the history
…le > 2GB

Signed-off-by: Bertrand Paquet <[email protected]>
  • Loading branch information
bpaquet committed Aug 20, 2024
1 parent 49f3d8f commit b713f47
Showing 1 changed file with 93 additions and 15 deletions.
108 changes: 93 additions & 15 deletions cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/labels"
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
)

type Config struct {
Expand Down Expand Up @@ -203,13 +205,13 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
}

key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
exists, err := e.s3Client.exists(ctx, key)
exists, size, err := e.s3Client.exists(ctx, key)
if err != nil {
return nil, errors.Wrapf(err, "failed to check file presence in cache")
}
if exists != nil {
if time.Since(*exists) > e.config.TouchRefresh {
err = e.s3Client.touch(ctx, key)
err = e.s3Client.touch(ctx, key, size)
if err != nil {
return nil, errors.Wrapf(err, "failed to touch file")
}
Expand Down Expand Up @@ -449,7 +451,7 @@ func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io
return err
}

func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) {
func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, *int64, error) {
input := &s3.HeadObjectInput{
Bucket: &s3Client.bucket,
Key: &key,
Expand All @@ -458,26 +460,102 @@ func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, e
head, err := s3Client.HeadObject(ctx, input)
if err != nil {
if isNotFound(err) {
return nil, nil
return nil, nil, nil
}
return nil, err
return nil, nil, err
}
return head.LastModified, head.ContentLength, nil
}

func buildCopySourceRange(start int64, objectSize int64) string {
end := start + maxCopyObjectSize - 1
if end > objectSize {
end = objectSize - 1
}
return head.LastModified, nil
startRange := strconv.FormatInt(start, 10)
stopRange := strconv.FormatInt(end, 10)
return "bytes=" + startRange + "-" + stopRange
}

func (s3Client *s3Client) touch(ctx context.Context, key string) error {
func (s3Client *s3Client) touch(ctx context.Context, key string, size *int64) error {
copySource := fmt.Sprintf("%s/%s", s3Client.bucket, key)
cp := &s3.CopyObjectInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
Key: &key,
Metadata: map[string]string{"updated-at": time.Now().String()},
MetadataDirective: "REPLACE",

// CopyObject does not support files > 5GB
if *size < maxCopyObjectSize {
cp := &s3.CopyObjectInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
Key: &key,
Metadata: map[string]string{"updated-at": time.Now().String()},
MetadataDirective: "REPLACE",
}

_, err := s3Client.CopyObject(ctx, cp)

return err
}
input := &s3.CreateMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
}

_, err := s3Client.CopyObject(ctx, cp)
output, err := s3Client.CreateMultipartUpload(ctx, input)
if err != nil {
return err
}

var currentPartNumber int32 = 1
var currentPosition int64 = 0
var completedParts []types.CompletedPart

for currentPosition < *size {
copyRange := buildCopySourceRange(currentPosition, *size)
partInput := s3.UploadPartCopyInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
CopySourceRange: &copyRange,
Key: &key,
PartNumber: &currentPartNumber,
UploadId: output.UploadId,
}
uploadPartCopyResult, err := s3Client.UploadPartCopy(ctx, &partInput)
if err != nil {
abortIn := s3.AbortMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
UploadId: output.UploadId,
}
_, errAbort := s3Client.AbortMultipartUpload(ctx, &abortIn)
if errAbort != nil {
return errAbort
}
return err
}
partNumber := new(int32)
*partNumber = currentPartNumber
completedParts = append(completedParts, types.CompletedPart{
ETag: uploadPartCopyResult.CopyPartResult.ETag,
PartNumber: partNumber,
})

currentPartNumber++
currentPosition += maxCopyObjectSize
}

CompleteMultipartUploadInput := &s3.CompleteMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
UploadId: output.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
}
_, completeErr := s3Client.CompleteMultipartUpload(ctx, CompleteMultipartUploadInput)
if completeErr != nil {
return completeErr
}

return err
return nil
}

func (s3Client *s3Client) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
Expand Down

0 comments on commit b713f47

Please sign in to comment.