From 9bef1fdc38d54cd7de1783631bb445323d27b91b Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Thu, 11 Feb 2021 12:49:01 +0100 Subject: [PATCH 1/2] Allow to customise S3 SSE on a per-request basis Signed-off-by: Marco Pracucci --- pkg/objstore/s3/s3.go | 37 ++++++++++++++++++++++++++++++---- pkg/objstore/s3/s3_test.go | 41 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go index edbe49d424..21bdcf49dd 100644 --- a/pkg/objstore/s3/s3.go +++ b/pkg/objstore/s3/s3.go @@ -32,6 +32,8 @@ import ( "gopkg.in/yaml.v2" ) +type ctxKey int + const ( // DirDelim is the delimiter used to model a directory structure in an object store bucket. DirDelim = "/" @@ -44,6 +46,11 @@ const ( // SSES3 is the name of the SSE-S3 method for objstore encryption. SSES3 = "SSE-S3" + + // SSEConfigKey is the context key to override SSE config. This feature is used by downstream + // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or + // refactoring can introduce breaking changes as far as the functionality is preserved. + SSEConfigKey = ctxKey(0) ) var DefaultConfig = Config{ @@ -147,7 +154,7 @@ type Bucket struct { logger log.Logger name string client *minio.Client - sse encrypt.ServerSide + defaultSSE encrypt.ServerSide putUserMetadata map[string]string partSize uint64 listObjectsV1 bool @@ -286,7 +293,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B logger: logger, name: config.Bucket, client: client, - sse: sse, + defaultSSE: sse, putUserMetadata: config.PutUserMetadata, partSize: config.PartSize, listObjectsV1: config.ListObjectsVersion == "v1", @@ -371,7 +378,12 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err } func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - opts := &minio.GetObjectOptions{ServerSideEncryption: b.sse} + sse, err := b.getServerSideEncryption(ctx) + if err != nil { + return nil, err + } + + opts := &minio.GetObjectOptions{ServerSideEncryption: sse} if length != -1 { if err := opts.SetRange(off, off+length-1); err != nil { return nil, err @@ -423,6 +435,11 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { + sse, err := b.getServerSideEncryption(ctx) + if err != nil { + return err + } + // TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this. size, err := objstore.TryToGetSize(r) if err != nil { @@ -443,7 +460,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { size, minio.PutObjectOptions{ PartSize: partSize, - ServerSideEncryption: b.sse, + ServerSideEncryption: sse, UserMetadata: b.putUserMetadata, }, ); err != nil { @@ -478,6 +495,18 @@ func (b *Bucket) IsObjNotFoundErr(err error) bool { func (b *Bucket) Close() error { return nil } +// getServerSideEncryption returns the SSE to use. +func (b *Bucket) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) { + if value := ctx.Value(SSEConfigKey); value != nil { + if sse, ok := value.(encrypt.ServerSide); ok { + return sse, nil + } + return nil, errors.New("invalid SSE config override provided in the context") + } + + return b.defaultSSE, nil +} + func configFromEnv() Config { c := Config{ Bucket: os.Getenv("S3_BUCKET"), diff --git a/pkg/objstore/s3/s3_test.go b/pkg/objstore/s3/s3_test.go index 4bd8abcd3d..b68b77f4cd 100644 --- a/pkg/objstore/s3/s3_test.go +++ b/pkg/objstore/s3/s3_test.go @@ -4,9 +4,13 @@ package s3 import ( + "context" "testing" "time" + "github.com/go-kit/kit/log" + "github.com/minio/minio-go/v7/pkg/encrypt" + "github.com/thanos-io/thanos/pkg/testutil" ) @@ -256,3 +260,40 @@ list_objects_version: "abcd"`) t.Errorf("parsing of list_objects_version failed: got %v, expected %v", cfg.ListObjectsVersion, "abcd") } } + +func TestBucket_getServerSideEncryption(t *testing.T) { + // Default config should return no SSE config. + cfg := DefaultConfig + cfg.Endpoint = "localhost:80" + bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + testutil.Ok(t, err) + + sse, err := bkt.getServerSideEncryption(context.Background()) + testutil.Ok(t, err) + testutil.Equals(t, nil, sse) + + // If SSE is configured in the client config it should be used. + cfg = DefaultConfig + cfg.Endpoint = "localhost:80" + cfg.SSEConfig = SSEConfig{Type: SSES3} + bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + testutil.Ok(t, err) + + sse, err = bkt.getServerSideEncryption(context.Background()) + testutil.Ok(t, err) + testutil.Equals(t, encrypt.S3, sse.Type()) + + // If SSE is configured in the context it should win. + cfg = DefaultConfig + cfg.Endpoint = "localhost:80" + cfg.SSEConfig = SSEConfig{Type: SSES3} + override, err := encrypt.NewSSEKMS("test", nil) + testutil.Ok(t, err) + + bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") + testutil.Ok(t, err) + + sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), SSEConfigKey, override)) + testutil.Ok(t, err) + testutil.Equals(t, encrypt.KMS, sse.Type()) +} From bae19b1bc8b630f843ba579cdc4a430a78cbbc62 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 15 Feb 2021 09:51:36 +0100 Subject: [PATCH 2/2] Addressed review comments Signed-off-by: Marco Pracucci --- pkg/objstore/s3/s3.go | 14 +++++++++++--- pkg/objstore/s3/s3_test.go | 2 +- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go index 21bdcf49dd..3c22d456a4 100644 --- a/pkg/objstore/s3/s3.go +++ b/pkg/objstore/s3/s3.go @@ -47,10 +47,12 @@ const ( // SSES3 is the name of the SSE-S3 method for objstore encryption. SSES3 = "SSE-S3" - // SSEConfigKey is the context key to override SSE config. This feature is used by downstream + // sseConfigKey is the context key to override SSE config. This feature is used by downstream // projects (eg. Cortex) to inject custom SSE config on a per-request basis. Future work or // refactoring can introduce breaking changes as far as the functionality is preserved. - SSEConfigKey = ctxKey(0) + // NOTE: we're using a context value only because it's a very specific S3 option. If SSE will + // be available to wider set of backends we should probably add a variadic option to Get() and Upload(). + sseConfigKey = ctxKey(0) ) var DefaultConfig = Config{ @@ -497,7 +499,7 @@ func (b *Bucket) Close() error { return nil } // getServerSideEncryption returns the SSE to use. func (b *Bucket) getServerSideEncryption(ctx context.Context) (encrypt.ServerSide, error) { - if value := ctx.Value(SSEConfigKey); value != nil { + if value := ctx.Value(sseConfigKey); value != nil { if sse, ok := value.(encrypt.ServerSide); ok { return sse, nil } @@ -581,3 +583,9 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke } }, nil } + +// ContextWithSSEConfig returns a context with a custom SSE config set. The returned context should be +// provided to S3 objstore client functions to override the default SSE config. +func ContextWithSSEConfig(ctx context.Context, value encrypt.ServerSide) context.Context { + return context.WithValue(ctx, sseConfigKey, value) +} diff --git a/pkg/objstore/s3/s3_test.go b/pkg/objstore/s3/s3_test.go index b68b77f4cd..466c669712 100644 --- a/pkg/objstore/s3/s3_test.go +++ b/pkg/objstore/s3/s3_test.go @@ -293,7 +293,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) { bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test") testutil.Ok(t, err) - sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), SSEConfigKey, override)) + sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), sseConfigKey, override)) testutil.Ok(t, err) testutil.Equals(t, encrypt.KMS, sse.Type()) }