Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Cache: Use multipart upload instead of CopyObject for touching file > 5GB #5266

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ Other options are:
* `name=<manifest>`: specify name of the manifest to use (default `buildkit`)
* Multiple manifest names can be specified at the same time, separated by `;`. The standard use case is to use the git sha1 as name, and the branch name as duplicate, and load both with 2 `import-cache` commands.
* `ignore-error=<false|true>`: specify if error is ignored in case cache export fails (default: `false`)
* `touch_refresh=24h`: Instead of being uploaded again when not changed, blobs files will be "touched" on s3 every `touch_refresh`, default is 24h. Due to this, an expiration policy can be set on the S3 bucket to cleanup useless files automatically. Manifests files are systematically rewritten, there is no need to touch them.

`--import-cache` options:
* `type=s3`
Expand Down
110 changes: 95 additions & 15 deletions cache/remotecache/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/labels"
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
attrSecretAccessKey = "secret_access_key"
attrSessionToken = "session_token"
attrUsePathStyle = "use_path_style"
maxCopyObjectSize = 5 * 1024 * 1024 * 1024
)

type Config struct {
Expand Down Expand Up @@ -203,13 +205,13 @@ func (e *exporter) Finalize(ctx context.Context) (map[string]string, error) {
}

key := e.s3Client.blobKey(dgstPair.Descriptor.Digest)
exists, err := e.s3Client.exists(ctx, key)
exists, size, err := e.s3Client.exists(ctx, key)
if err != nil {
return nil, errors.Wrapf(err, "failed to check file presence in cache")
}
if exists != nil {
if time.Since(*exists) > e.config.TouchRefresh {
err = e.s3Client.touch(ctx, key)
err = e.s3Client.touch(ctx, key, size)
if err != nil {
return nil, errors.Wrapf(err, "failed to touch file")
}
Expand Down Expand Up @@ -449,7 +451,7 @@ func (s3Client *s3Client) saveMutableAt(ctx context.Context, key string, body io
return err
}

func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, error) {
func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, *int64, error) {
input := &s3.HeadObjectInput{
Bucket: &s3Client.bucket,
Key: &key,
Expand All @@ -458,26 +460,104 @@ func (s3Client *s3Client) exists(ctx context.Context, key string) (*time.Time, e
head, err := s3Client.HeadObject(ctx, input)
if err != nil {
if isNotFound(err) {
return nil, nil
return nil, nil, nil
}
return nil, err
return nil, nil, err
}
return head.LastModified, head.ContentLength, nil
}

func buildCopySourceRange(start int64, objectSize int64) string {
end := start + maxCopyObjectSize - 1
if end > objectSize {
end = objectSize - 1
}
return head.LastModified, nil
startRange := strconv.FormatInt(start, 10)
stopRange := strconv.FormatInt(end, 10)
return "bytes=" + startRange + "-" + stopRange
}

func (s3Client *s3Client) touch(ctx context.Context, key string) error {
func (s3Client *s3Client) touch(ctx context.Context, key string, size *int64) (err error) {
copySource := fmt.Sprintf("%s/%s", s3Client.bucket, key)
cp := &s3.CopyObjectInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
Key: &key,
Metadata: map[string]string{"updated-at": time.Now().String()},
MetadataDirective: "REPLACE",

// CopyObject does not support files > 5GB
if *size < maxCopyObjectSize {
cp := &s3.CopyObjectInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
Key: &key,
Metadata: map[string]string{"updated-at": time.Now().String()},
MetadataDirective: "REPLACE",
}

_, err := s3Client.CopyObject(ctx, cp)

return err
}
input := &s3.CreateMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
}

_, err := s3Client.CopyObject(ctx, cp)
output, err := s3Client.CreateMultipartUpload(ctx, input)
if err != nil {
return err
}

return err
defer func() {
abortIn := s3.AbortMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
UploadId: output.UploadId,
}
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires err to be named return value as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

s3Client.AbortMultipartUpload(ctx, &abortIn)
}
}()

var currentPartNumber int32 = 1
var currentPosition int64
var completedParts []types.CompletedPart

for currentPosition < *size {
copyRange := buildCopySourceRange(currentPosition, *size)
partInput := s3.UploadPartCopyInput{
Bucket: &s3Client.bucket,
CopySource: &copySource,
CopySourceRange: &copyRange,
Key: &key,
PartNumber: &currentPartNumber,
UploadId: output.UploadId,
}
uploadPartCopyResult, err := s3Client.UploadPartCopy(ctx, &partInput)
if err != nil {
return err
}
partNumber := new(int32)
*partNumber = currentPartNumber
completedParts = append(completedParts, types.CompletedPart{
ETag: uploadPartCopyResult.CopyPartResult.ETag,
PartNumber: partNumber,
})

currentPartNumber++
currentPosition += maxCopyObjectSize
}

completeMultipartUploadInput := &s3.CompleteMultipartUploadInput{
Bucket: &s3Client.bucket,
Key: &key,
UploadId: output.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
}

if _, err := s3Client.CompleteMultipartUpload(ctx, completeMultipartUploadInput); err != nil {
return err
}

return nil
}

func (s3Client *s3Client) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
Expand Down
Loading