Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objstore/s3, docs/storage.md: Added 'path' field for S3 objstore #1862

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1758](https://github.com/thanos-io/thanos/pull/1758) Bucket: `thanos bucket web` now supports `--web.external-prefix` for proxying on a subpath.
- [#1770](https://github.com/thanos-io/thanos/pull/1770) Bucket: Add `--web.prefix-header` flags to allow for bucket UI to be accessible behind a reverse proxy.
- [#1668](https://github.com/thanos-io/thanos/pull/1668) Receiver: Added TLS options for both server and client remote write.
- [#1862](https://github.com/thanos-io/thanos/pull/1862) objstore/s3, docs/storage.md: Added 'path' field for S3 objstore

### Fixed

Expand Down
3 changes: 3 additions & 0 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ NOTE: Minio client was mainly for AWS S3, but it can be configured against other
type: S3
config:
bucket: ""
path: ""
endpoint: ""
region: ""
access_key: ""
Expand Down Expand Up @@ -107,6 +108,8 @@ Please refer to the documentation of [the Transport type](https://golang.org/pkg

`part_size` is specified in bytes and refers to the minimum file size used for multipart uploads, as some custom S3 implementations may have different requirements. A value of `0` means to use a default 128 MiB size.

When `path` is specified client uses this value as a prefix for a path to the data. Essentially this allows saving the data into the subdirectory.

For debug and testing purposes you can set

* `insecure: true` to switch to plain insecure HTTP instead of HTTPS
Expand Down
43 changes: 29 additions & 14 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"os"
"path"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -45,6 +46,7 @@ var DefaultConfig = Config{
// Config stores the configuration for s3 bucket.
type Config struct {
Bucket string `yaml:"bucket"`
Path string `yaml:"path"`
Endpoint string `yaml:"endpoint"`
Region string `yaml:"region"`
AccessKey string `yaml:"access_key"`
Expand Down Expand Up @@ -74,6 +76,7 @@ type HTTPConfig struct {
type Bucket struct {
logger log.Logger
name string
path string
client *minio.Client
sse encrypt.ServerSide
putUserMetadata map[string]string
Expand Down Expand Up @@ -172,10 +175,17 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
logWriter := log.NewStdlibAdapter(level.Debug(logger), log.MessageKey("s3TraceMsg"))
client.TraceOn(logWriter)
}
var pathprefix string

if config.Path != "" {
level.Info(logger).Log("msg", "data will be stored under the path", "path", config.Path)
pathprefix = path.Clean(config.Path) + DirDelim
}

bkt := &Bucket{
logger: logger,
name: config.Bucket,
path: pathprefix,
client: client,
sse: sse,
putUserMetadata: config.PutUserMetadata,
Expand Down Expand Up @@ -223,21 +233,25 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
if dir != "" {
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}

for object := range b.client.ListObjects(b.name, dir, false, ctx.Done()) {
for object := range b.client.ListObjects(b.name, path.Join(b.path, dir), false, ctx.Done()) {
// Catch the error when failed to list objects.
if object.Err != nil {
return object.Err
}

key := strings.TrimPrefix(object.Key, b.path)

// This sometimes happens with empty buckets.
if object.Key == "" {
if key == "" {
continue
}

// The s3 client can also return the directory itself in the ListObjects call above.
if object.Key == dir {
if key == dir {
continue
}
if err := f(object.Key); err != nil {

if err := f(key); err != nil {
return err
}
}
Expand All @@ -256,7 +270,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
}
r, err := b.client.GetObjectWithContext(ctx, b.name, name, *opts)
r, err := b.client.GetObjectWithContext(ctx, b.name, path.Join(b.path, name), *opts)
if err != nil {
return nil, err
}
Expand All @@ -275,17 +289,17 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.getRange(ctx, name, 0, -1)
return b.getRange(ctx, path.Join(b.path, name), 0, -1)
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.getRange(ctx, name, off, length)
return b.getRange(ctx, path.Join(b.path, name), off, length)
}

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
_, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
_, err := b.client.StatObject(b.name, path.Join(b.path, name), minio.StatObjectOptions{})
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
Expand All @@ -302,11 +316,11 @@ func (b *Bucket) guessFileSize(name string, r io.Reader) int64 {
if err == nil {
return fileInfo.Size()
}
level.Warn(b.logger).Log("msg", "could not stat file for multipart upload", "name", name, "err", err)
level.Warn(b.logger).Log("msg", "could not stat file for multipart upload", "name", path.Join(b.path, name), "err", err)
return -1
}

level.Warn(b.logger).Log("msg", "could not guess file size for multipart upload", "name", name)
level.Warn(b.logger).Log("msg", "could not guess file size for multipart upload", "name", path.Join(b.path, name))
return -1
}

Expand All @@ -323,7 +337,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.PutObjectWithContext(
ctx,
b.name,
name,
path.Join(b.path, name),
r,
size,
minio.PutObjectOptions{
Expand All @@ -340,7 +354,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
objInfo, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
objInfo, err := b.client.StatObject(b.name, path.Join(b.path, name), minio.StatObjectOptions{})
if err != nil {
return 0, err
}
Expand All @@ -349,7 +363,7 @@ func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.client.RemoveObject(b.name, name)
return b.client.RemoveObject(b.name, path.Join(b.path, name))
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
Expand All @@ -362,6 +376,7 @@ func (b *Bucket) Close() error { return nil }
func configFromEnv() Config {
c := Config{
Bucket: os.Getenv("S3_BUCKET"),
Path: os.Getenv("S3_PATH"),
Endpoint: os.Getenv("S3_ENDPOINT"),
AccessKey: os.Getenv("S3_ACCESS_KEY"),
SecretKey: os.Getenv("S3_SECRET_KEY"),
Expand Down
17 changes: 17 additions & 0 deletions pkg/objstore/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,21 @@ http_config:
cfg2, err := parseConfig(input2)
testutil.Ok(t, err)
testutil.Assert(t, cfg2.PartSize == 1024*1024*100, "when part size should be set to 100MiB")

input3 := []byte(`bucket: "bucket-name"
endpoint: "s3-endpoint"
access_key: "access_key"
insecure: false
signature_version2: false
encrypt_sse: false
secret_key: "secret_key"
part_size: 104857600
path: thanos
http_config:
insecure_skip_verify: false
idle_conn_timeout: 50s`)

cfg3, err := parseConfig(input3)
testutil.Ok(t, err)
testutil.Assert(t, cfg3.Path == "thanos", "when part size should be set to 'thanos'")
}