Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objstore : implement Aliyun OSS #1234

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d67a69d
Implemented AliYun OSS for objstore support
Jun 7, 2019
f973be4
restore github.com/improbable-eng/thanos/blob/master/go.mod & go.sum
Jun 7, 2019
4abb631
Implemented AliYun OSS for objstore support following Object Storage->
Jun 7, 2019
4538290
aliyun oss objstore works well
Jun 7, 2019
86c5548
put img under 'img' path
Jun 7, 2019
714b5d3
correct method Iter for pageing view objects
Jun 11, 2019
9dd19b7
Update config.yml
snowbubblyzone Jun 11, 2019
5202497
change storage.md
snowbubblyzone Jun 11, 2019
9b64327
change storage.md
snowbubblyzone Jun 11, 2019
3240df3
Update storage.md
snowbubblyzone Jun 11, 2019
aec4ee4
Update Makefile
snowbubblyzone Jun 11, 2019
1182217
Update config.yml
snowbubblyzone Jun 11, 2019
a827af3
Update config.yml
snowbubblyzone Jun 11, 2019
495908d
Update config.yml
snowbubblyzone Jun 11, 2019
a783bb3
Update oss.go
snowbubblyzone Jun 11, 2019
b47347f
Update config.yml
snowbubblyzone Jun 11, 2019
64f024f
correct access_key_id & access_key_secret
snowbubblyzone Jun 12, 2019
8ac2d43
Update CHANGELOG.md
snowbubblyzone Jun 14, 2019
7e47e41
delete 'make docs' , re-order the method Iter() in oss.go file
snowbubblyzone Jun 14, 2019
3c1f6e1
Update config.yml
snowbubblyzone Jun 14, 2019
d926d6e
Delete .config.yml.swp
snowbubblyzone Jun 14, 2019
2adae0c
Update docs/storage.md
snowbubblyzone Jun 14, 2019
b00d8a5
Update docs/storage.md
snowbubblyzone Jun 14, 2019
e0bd68a
Update pkg/objstore/client/factory.go
snowbubblyzone Jun 14, 2019
ecaa38b
Update proxy.go
snowbubblyzone Jun 14, 2019
0380d47
change object store from 'OSS' to 'AliYunOSS'
snowbubblyzone Jun 14, 2019
b66ad90
Update CHANGELOG.md
snowbubblyzone Jun 17, 2019
04f416c
add 1097 PR in CHANGELOG.md
snowbubblyzone Jun 18, 2019
e895342
add 1097 PR in CHANGELOG.md
snowbubblyzone Jun 18, 2019
a2ad5c5
add #1097 PR in CHANGELOG.md
snowbubblyzone Jun 18, 2019
400a927
Merge branch 'master' into master
snowbubblyzone Jun 18, 2019
1c90eb2
delete select code , rename AccessKeyID , delete
snowbubblyzone Jun 19, 2019
4a6854f
add error wrapper, change AliYunOSS to ALIYUNOSS provider, add @wujin…
snowbubblyzone Jun 21, 2019
b7ba6af
Merge branch 'master' into master
snowbubblyzone Jun 21, 2019
4a14d92
change aliyun oss code to support bucket creating & bucket reusing
snowbubblyzone Jun 25, 2019
dca0fb7
update code for multi-part upload , change improbable-eng to thanos-io
Jul 27, 2019
feed309
update code for multi-part upload , change improbable-eng to thanos-io
Jul 27, 2019
1091424
Merge branch 'master' into master
snowbubblyzone Jul 27, 2019
540fc5b
Update main.go
snowbubblyzone Jul 27, 2019
c1bab13
Update pool_test.go
snowbubblyzone Jul 27, 2019
f6f50d3
Update handler.go
snowbubblyzone Jul 27, 2019
1130f3d
Update proxy.go
snowbubblyzone Jul 27, 2019
656e1fb
Update bucket.go
snowbubblyzone Jul 27, 2019
037f411
Update oss.go
snowbubblyzone Jul 27, 2019
0f89466
Update oss.go
snowbubblyzone Jul 27, 2019
efa50b1
Update bucket.go
snowbubblyzone Jul 27, 2019
04c9098
update code for multi-part upload
Jul 27, 2019
6b6a70c
remove blank line , add PR to CHANGELOG.md
Aug 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/thanos/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ func regCommonObjStoreFlags(cmd *kingpin.CmdClause, suffix string, required bool
}
}


func regCommonTracingFlags(app *kingpin.Application) *pathOrContent {
fileFlagName := fmt.Sprintf("tracing.config-file")
contentFlagName := fmt.Sprintf("tracing.config")
Expand Down
Binary file added docs/img/foreachtest.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/img/thanos.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions pkg/objstore/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore/azure"
"github.com/improbable-eng/thanos/pkg/objstore/cos"
"github.com/improbable-eng/thanos/pkg/objstore/gcs"
"github.com/improbable-eng/thanos/pkg/objstore/oss"
"github.com/improbable-eng/thanos/pkg/objstore/s3"
"github.com/improbable-eng/thanos/pkg/objstore/swift"
"github.com/pkg/errors"
Expand All @@ -26,6 +27,7 @@ const (
AZURE ObjProvider = "AZURE"
SWIFT ObjProvider = "SWIFT"
COS ObjProvider = "COS"
OSS ObjProvider = "OSS"
This conversation was marked as resolved.
Show resolved Hide resolved
)

type BucketConfig struct {
Expand Down Expand Up @@ -59,6 +61,8 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe
bucket, err = swift.NewContainer(logger, config)
case string(COS):
bucket, err = cos.NewBucket(logger, config, component)
case string(OSS):
bucket, err = oss.NewBucket(logger, config, component)
default:
return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type)
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/objstore/objtesting/foreach.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/improbable-eng/thanos/pkg/objstore/cos"
"github.com/improbable-eng/thanos/pkg/objstore/gcs"
"github.com/improbable-eng/thanos/pkg/objstore/inmem"
"github.com/improbable-eng/thanos/pkg/objstore/oss"
"github.com/improbable-eng/thanos/pkg/objstore/s3"
"github.com/improbable-eng/thanos/pkg/objstore/swift"
"github.com/improbable-eng/thanos/pkg/testutil"
Expand Down Expand Up @@ -117,4 +118,20 @@ func ForeachStore(t *testing.T, testFn func(t testing.TB, bkt objstore.Bucket))
} else {
t.Log("THANOS_SKIP_TENCENT_COS_TESTS envvar present. Skipping test against Tencent COS.")
}

// Optional OSS.
if _, ok := os.LookupEnv("THANOS_SKIP_ALIYUN_OSS_TESTS"); !ok {
bkt, closeFn, err := oss.NewTestBucket(t, "e2e-test")
testutil.Ok(t, err)

ok := t.Run("AliYun oss", func(t *testing.T) {
testFn(t, bkt)
})
closeFn()
if !ok {
return
}
} else {
t.Log("THANOS_SKIP_ALIYUN_OSS_TESTS envvar present. Skipping test against AliYun OSS.")
}
}
302 changes: 302 additions & 0 deletions pkg/objstore/oss/oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
package oss

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"strings"
"testing"
"time"

alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/pkg/errors"
yaml "gopkg.in/yaml.v2"
)

type Config struct {
EndPoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
SecretKey string `yaml:"secret_key"`
SecretId string `yaml:"secret_id"`
}

type objectInfo struct {
key string
err error
}

type Bucket struct {
name string
logger log.Logger
client *alioss.Client
config Config
}

func NewTestBucket(t testing.TB, location string) (objstore.Bucket, func(), error) {
c := configFromEnv()
if err := ValidateForTests(c); err != nil {
return nil, nil, err
}
if c.Bucket != "" && os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "" {
return nil, nil, errors.New("OSS_BUCKET is defined. Normally this tests will create temporary bucket " +
"and delete it after test. Unset OSS_BUCKET env variable to use default logic. If you really want to run " +
"tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true. WARNING: That bucket " +
"needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " +
"to safety (accidentally pointing prod bucket for test) as well as aliyun oss not being fully strong consistent.")
This conversation was marked as resolved.
Show resolved Hide resolved
}

return NewTestBucketFromConfig(t, location, c, true)
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
bucket, err := b.client.Bucket(b.config.Bucket)
This conversation was marked as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "oss bucket")
}
if err2 := bucket.PutObject(name, r); err != nil {
return errors.Wrap(err2, "upload oss object")
}
return nil
}

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
bucket, err := b.client.Bucket(b.config.Bucket)
if err != nil {
return errors.Wrap(err, "oss bucket")
}
if err2 := bucket.DeleteObject(name); err != nil {
return errors.Wrap(err2, "delete oss object")
}
return nil
}

func configFromEnv() Config {
c := Config{
EndPoint: os.Getenv("OSS_ENDPOINT"),
Bucket: os.Getenv("OSS_BUCKET"),
SecretKey: os.Getenv("OSS_SECRET_KEY"),
SecretId: os.Getenv("OSS_SECRET_ID"),
}

return c
}

// ValidateForTests checks to see the config options for tests are set.
func ValidateForTests(conf Config) error {
This conversation was marked as resolved.
Show resolved Hide resolved
if conf.EndPoint == "" ||
conf.SecretId == "" ||
conf.SecretKey == "" {
return errors.New("insufficient oss test configuration information")
}
return nil
}

func parseConfig(conf []byte) (Config, error) {
This conversation was marked as resolved.
Show resolved Hide resolved
var config Config
if err := yaml.Unmarshal(conf, &config); err != nil {
return Config{}, err
}

return config, nil
}

func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
This conversation was marked as resolved.
Show resolved Hide resolved
}
client, err := alioss.New(config.EndPoint, config.SecretId, config.SecretKey)
This conversation was marked as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
This conversation was marked as resolved.
Show resolved Hide resolved
}
bkt := &Bucket{
logger: logger,
client: client,
name: config.Bucket,
config: config,
}
return bkt, nil
}

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
This conversation was marked as resolved.
Show resolved Hide resolved
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
if dir != "" {
dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim
}

bucket, err := b.client.Bucket(b.config.Bucket)
if err != nil {
return errors.Wrap(err, "oss bucket")
}
objects, err2 := bucket.ListObjects(alioss.Prefix(dir), alioss.Delimiter(objstore.DirDelim))
This conversation was marked as resolved.
Show resolved Hide resolved
if err2 != nil {
return errors.Wrap(err2, "oss bucket list")
}
for _, object := range objects.Objects {

if object.Key == "" {
continue
}
if err := f(object.Key); err != nil {
return err
}
}
for _, object := range objects.CommonPrefixes {
if err := f(object); err != nil {
return err
}
}

return nil
}

func (b *Bucket) Name() string {
return b.name
}

// validate checks to see the config options are set.
func validate(conf Config) error {
if conf.EndPoint == "" {
return errors.New("no oss endpoint in config file")
This conversation was marked as resolved.
Show resolved Hide resolved
}

if conf.SecretId == "" && conf.SecretKey != "" {
return errors.New("no oss acccess_key specified while secret_key is present in config file; both of them should be present in config.")
}
This conversation was marked as resolved.
Show resolved Hide resolved

if conf.SecretId != "" && conf.SecretKey == "" {
return errors.New("no oss secret_key specified while access_key is present in config file; both of them should be present in config.")
}
return nil
}

func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucket bool) (objstore.Bucket, func(), error) {
bc, err := yaml.Marshal(c)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}

bktToCreate := c.Bucket
if c.Bucket != "" && reuseBucket {
if err := b.Iter(context.Background(), "", func(f string) error {
return errors.Errorf("bucket %s is not empty", c.Bucket)
}); err != nil {
return nil, nil, errors.Wrapf(err, "oss check bucket %s", c.Bucket)
}

t.Log("WARNING. Reusing", c.Bucket, "Aliyun OSS bucket for OSS tests. Manual cleanup afterwards is required")
return b, func() {}, nil
}

if c.Bucket == "" {
src := rand.NewSource(time.Now().UnixNano())

bktToCreate = strings.Replace(fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), "_", "-", -1)
if len(bktToCreate) >= 63 {
bktToCreate = bktToCreate[:63]
}
}

if err := b.client.CreateBucket(bktToCreate, alioss.ACL(alioss.ACLPrivate), alioss.StorageClass(alioss.StorageArchive)); err != nil {
This conversation was marked as resolved.
Show resolved Hide resolved
return nil, nil, err
This conversation was marked as resolved.
Show resolved Hide resolved
}
b.name = bktToCreate
t.Log("created temporary Aliyun bucket for Aliyun tests with name: %s @ %s ", bktToCreate, location)

return b, func() {
objstore.EmptyBucket(t, context.Background(), b)
if err := b.client.DeleteBucket(bktToCreate); err != nil {
t.Logf("deleting bucket %s failed: %s", bktToCreate, err)
}
}, nil
}

func (b *Bucket) Close() error { return nil }

func setRange(opts *[]alioss.Option, start, end int64) error {
if 0 <= start && start <= end {
*opts = []alioss.Option{alioss.Range(start, end)}
This conversation was marked as resolved.
Show resolved Hide resolved
} else {
return errors.Errorf("Invalid range specified: start=%d end=%d", start, end)
}
return nil
}

func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
if len(name) == 0 {
return nil, errors.Errorf("given object name should not empty")
This conversation was marked as resolved.
Show resolved Hide resolved
}

var opts []alioss.Option
if length != -1 {
if err := setRange(&opts, off, off+length-1); err != nil {
return nil, err
}
}

bucket, err := b.client.Bucket(b.config.Bucket)
if err != nil {
return nil, err
}

resp, err2 := bucket.GetObject(name, opts...)
if err2 != nil {
return nil, err2
}

if _, err := resp.Read(nil); err != nil {
runutil.CloseWithLogOnErr(b.logger, resp, "oss get range obj close")
return nil, err
}

return resp, nil
}

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.getRange(ctx, name, 0, -1)
}

func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.getRange(ctx, name, off, length)
}

// Exists checks if the given object exists in the bucket.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
bucket, err := b.client.Bucket(b.config.Bucket)
if err != nil {
return false, errors.Wrap(err, "oss bucket")
}
exists, err := bucket.IsObjectExist(name)
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
}
return false, errors.Wrap(err, "head oss object")
This conversation was marked as resolved.
Show resolved Hide resolved
}

return exists, nil
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
switch err.(type) {
This conversation was marked as resolved.
Show resolved Hide resolved
case alioss.ServiceError:
if err.(alioss.ServiceError).StatusCode == 404 {
This conversation was marked as resolved.
Show resolved Hide resolved
return true
}
}
return false
}
3 changes: 2 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package store
import (
"context"
"fmt"
"github.com/improbable-eng/thanos/pkg/tracing"
"io"
"math"
"strings"
"sync"
"time"

"github.com/improbable-eng/thanos/pkg/tracing"

This conversation was marked as resolved.
Show resolved Hide resolved
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
Expand Down
2 changes: 1 addition & 1 deletion pkg/tracing/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"io"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/tracing/jaeger"
"github.com/improbable-eng/thanos/pkg/tracing/stackdriver"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
)

Expand Down
Loading