Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to customise S3 SSE on a per-request basis #3783

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"gopkg.in/yaml.v2"
)

type ctxKey int

const (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
DirDelim = "/"
Expand All @@ -44,6 +46,13 @@ const (

// SSES3 is the name of the SSE-S3 method for objstore encryption.
SSES3 = "SSE-S3"

// sseConfigKey is the context key to override SSE config. This feature is used by downstream
// projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
// refactoring can introduce breaking changes as far as the functionality is preserved.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment maybe, a NOTE that this is context value only because it's a very specific S3 option. If SSE is available wider we should probably add a variadic option as (#3784 (comment)) WDYT?

// NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NOTE: we're using a context value only because it's a very specific S3 option. If SSE will
// NOTE: We're using a context value only because it's a very specific S3 option. If SSE will

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a biggy, let's merge.

// be available to wider set of backends we should probably add a variadic option to Get() and Upload().
sseConfigKey = ctxKey(0)
)

var DefaultConfig = Config{
Expand Down Expand Up @@ -147,7 +156,7 @@ type Bucket struct {
logger log.Logger
name string
client *minio.Client
sse encrypt.ServerSide
defaultSSE encrypt.ServerSide
putUserMetadata map[string]string
partSize uint64
listObjectsV1 bool
Expand Down Expand Up @@ -286,7 +295,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
logger: logger,
name: config.Bucket,
client: client,
sse: sse,
defaultSSE: sse,
putUserMetadata: config.PutUserMetadata,
partSize: config.PartSize,
listObjectsV1: config.ListObjectsVersion == "v1",
Expand Down Expand Up @@ -371,7 +380,12 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
}

func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
opts := &minio.GetObjectOptions{ServerSideEncryption: b.sse}
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return nil, err
}

opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
if length != -1 {
if err := opts.SetRange(off, off+length-1); err != nil {
return nil, err
Expand Down Expand Up @@ -423,6 +437,11 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return err
}

// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
size, err := objstore.TryToGetSize(r)
if err != nil {
Expand All @@ -443,7 +462,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
size,
minio.PutObjectOptions{
PartSize: partSize,
ServerSideEncryption: b.sse,
ServerSideEncryption: sse,
UserMetadata: b.putUserMetadata,
},
); err != nil {
Expand Down Expand Up @@ -478,6 +497,18 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool {

func (b *Bucket) Close() error { return nil }

// getServerSideEncryption returns the SSE to use.
func (b *Bucket) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
if value := ctx.Value(sseConfigKey); value != nil {
if sse, ok := value.(encrypt.ServerSide); ok {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used encrypt.ServerSide instead of SSEConfig in order to avoid having to parse SSEConfig for each request (eg. SSEC reads a file, SSEKMS marshals json).

return sse, nil
}
return nil, errors.New("invalid SSE config override provided in the context")
}

return b.defaultSSE, nil
}

func configFromEnv() Config {
c := Config{
Bucket: os.Getenv("S3_BUCKET"),
Expand Down Expand Up @@ -552,3 +583,9 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke
}
}, nil
}

// ContextWithSSEConfig returns a context with a custom SSE config set. The returned context should be
// provided to S3 objstore client functions to override the default SSE config.
func ContextWithSSEConfig(ctx context.Context, value encrypt.ServerSide) context.Context {
return context.WithValue(ctx, sseConfigKey, value)
}
41 changes: 41 additions & 0 deletions pkg/objstore/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
package s3

import (
"context"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/minio/minio-go/v7/pkg/encrypt"

"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -256,3 +260,40 @@ list_objects_version: "abcd"`)
t.Errorf("parsing of list_objects_version failed: got %v, expected %v", cfg.ListObjectsVersion, "abcd")
}
}

func TestBucket_getServerSideEncryption(t *testing.T) {
// Default config should return no SSE config.
cfg := DefaultConfig
cfg.Endpoint = "localhost:80"
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
testutil.Ok(t, err)

sse, err := bkt.getServerSideEncryption(context.Background())
testutil.Ok(t, err)
testutil.Equals(t, nil, sse)

// If SSE is configured in the client config it should be used.
cfg = DefaultConfig
cfg.Endpoint = "localhost:80"
cfg.SSEConfig = SSEConfig{Type: SSES3}
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.Background())
testutil.Ok(t, err)
testutil.Equals(t, encrypt.S3, sse.Type())

// If SSE is configured in the context it should win.
cfg = DefaultConfig
cfg.Endpoint = "localhost:80"
cfg.SSEConfig = SSEConfig{Type: SSES3}
override, err := encrypt.NewSSEKMS("test", nil)
testutil.Ok(t, err)

bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), sseConfigKey, override))
testutil.Ok(t, err)
testutil.Equals(t, encrypt.KMS, sse.Type())
}