Skip to content

Commit

Permalink
Allow to customise S3 SSE on a per-request basis
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Feb 11, 2021
1 parent 7b09e30 commit eff5b8b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
37 changes: 33 additions & 4 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"gopkg.in/yaml.v2"
)

type ctxKey int

const (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
DirDelim = "/"
Expand All @@ -44,6 +46,11 @@ const (

// SSES3 is the name of the SSE-S3 method for objstore encryption.
SSES3 = "SSE-S3"

// SSEConfigKey is the context key to override SSE config. This feature is used by downstream
// projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or
// refactoring can introduce breaking changes as far as the functionality is preserved.
SSEConfigKey = ctxKey(0)
)

var DefaultConfig = Config{
Expand Down Expand Up @@ -147,7 +154,7 @@ type Bucket struct {
logger log.Logger
name string
client *minio.Client
sse encrypt.ServerSide
defaultSSE encrypt.ServerSide
putUserMetadata map[string]string
partSize uint64
listObjectsV1 bool
Expand Down Expand Up @@ -286,7 +293,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
logger: logger,
name: config.Bucket,
client: client,
sse: sse,
defaultSSE: sse,
putUserMetadata: config.PutUserMetadata,
partSize: config.PartSize,
listObjectsV1: config.ListObjectsVersion == "v1",
Expand Down Expand Up @@ -371,7 +378,12 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
}

func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
opts := &minio.GetObjectOptions{ServerSideEncryption: b.sse}
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return nil, err
}

opts := &minio.GetObjectOptions{ServerSideEncryption: sse}
if length != -1 {
if err := opts.SetRange(off, off+length-1); err != nil {
return nil, err
Expand Down Expand Up @@ -423,6 +435,11 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return err
}

// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
size, err := objstore.TryToGetSize(r)
if err != nil {
Expand All @@ -443,7 +460,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
size,
minio.PutObjectOptions{
PartSize: partSize,
ServerSideEncryption: b.sse,
ServerSideEncryption: sse,
UserMetadata: b.putUserMetadata,
},
); err != nil {
Expand Down Expand Up @@ -478,6 +495,18 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool {

func (b *Bucket) Close() error { return nil }

// getServerSideEncryption returns the SSE to use.
func (b *Bucket) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) {
if value := ctx.Value(SSEConfigKey); value != nil {
if sse, ok := value.(encrypt.ServerSide); ok {
return sse, nil
}
return nil, errors.New("invalid SSE config override provided in the context")
}

return b.defaultSSE, nil
}

func configFromEnv() Config {
c := Config{
Bucket: os.Getenv("S3_BUCKET"),
Expand Down
41 changes: 41 additions & 0 deletions pkg/objstore/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
package s3

import (
"context"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/minio/minio-go/v7/pkg/encrypt"

"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -256,3 +260,40 @@ list_objects_version: "abcd"`)
t.Errorf("parsing of list_objects_version failed: got %v, expected %v", cfg.ListObjectsVersion, "abcd")
}
}

func TestBucket_getServerSideEncryption(t *testing.T) {
// Default config should return no SSE config.
cfg := DefaultConfig
cfg.Endpoint = "localhost:80"
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
testutil.Ok(t, err)

sse, err := bkt.getServerSideEncryption(context.Background())
testutil.Ok(t, err)
testutil.Equals(t, nil, sse)

// If SSE is configured in the client config it should be used.
cfg = DefaultConfig
cfg.Endpoint = "localhost:80"
cfg.SSEConfig = SSEConfig{Type: SSES3}
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.Background())
testutil.Ok(t, err)
testutil.Equals(t, encrypt.S3, sse.Type())

// If SSE is configured in the context it should win.
cfg = DefaultConfig
cfg.Endpoint = "localhost:80"
cfg.SSEConfig = SSEConfig{Type: SSES3}
override, err := encrypt.NewSSEKMS("test", nil)
testutil.Ok(t, err)

bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), SSEConfigKey, override))
testutil.Ok(t, err)
testutil.Equals(t, encrypt.KMS, sse.Type())
}

0 comments on commit eff5b8b

Please sign in to comment.