diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go
index a9930139959..3bcc94a3ab3 100644
--- a/cmd/thanos/main_test.go
+++ b/cmd/thanos/main_test.go
@@ -22,7 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
+ "github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
@@ -36,7 +36,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
- bkt := inmem.NewBucket()
+ bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
// Upload one compaction lvl = 2 block, one compaction lvl = 1.
// We generate index cache files only for lvl > 1 blocks.
@@ -97,7 +97,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
- bkt := inmem.NewBucket()
+ bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
var id ulid.ULID
{
id, err = e2eutil.CreateBlock(
diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go
index 6e2193b5d7a..503d09db75d 100644
--- a/pkg/block/block_test.go
+++ b/pkg/block/block_test.go
@@ -19,7 +19,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
+ "github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
@@ -81,7 +81,7 @@ func TestUpload(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
@@ -185,7 +185,7 @@ func TestDelete(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
{
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
@@ -232,7 +232,7 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
{
blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go
index e83bc5633dd..0a8ca111966 100644
--- a/pkg/block/fetcher.go
+++ b/pkg/block/fetcher.go
@@ -142,7 +142,7 @@ type MetadataModifier interface {
type BaseFetcher struct {
logger log.Logger
concurrency int
- bkt objstore.BucketReader
+ bkt objstore.InstrumentedBucketReader
// Optional local directory to cache meta.json files.
cacheDir string
@@ -152,7 +152,7 @@ type BaseFetcher struct {
}
// NewBaseFetcher constructs BaseFetcher.
-func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
+func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
@@ -180,7 +180,7 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
}
// NewMetaFetcher returns meta fetcher.
-func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
+func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
if err != nil {
return nil, err
@@ -236,7 +236,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
}
}
- r, err := f.bkt.Get(ctx, metaFile)
+ r, err := f.bkt.ReaderWithExpectedErrs(f.bkt.IsObjNotFoundErr).Get(ctx, metaFile)
if f.bkt.IsObjNotFoundErr(err) {
// Meta.json was deleted between bkt.Exists and here.
return nil, errors.Wrapf(ErrorSyncMetaNotFound, "%v", err)
@@ -740,12 +740,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
- bkt objstore.BucketReader
+ bkt objstore.InstrumentedBucketReader
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}
// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
-func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.BucketReader, delay time.Duration) *IgnoreDeletionMarkFilter {
+func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
logger: logger,
bkt: bkt,
diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go
index 953ed93f4fe..23ba8b3b16c 100644
--- a/pkg/block/fetcher_test.go
+++ b/pkg/block/fetcher_test.go
@@ -75,7 +75,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
- baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, bkt, dir, r)
+ baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, objstore.NoopInstrumentedBucket{bkt}, dir, r)
testutil.Ok(t, err)
fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
@@ -1065,7 +1065,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
now := time.Now()
f := &IgnoreDeletionMarkFilter{
logger: log.NewNopLogger(),
- bkt: bkt,
+ bkt: objstore.NoopInstrumentedBucket{bkt},
delay: 48 * time.Hour,
}
diff --git a/pkg/block/indexheader/json_reader.go b/pkg/block/indexheader/json_reader.go
index 5a3654f4d60..fe54534972f 100644
--- a/pkg/block/indexheader/json_reader.go
+++ b/pkg/block/indexheader/json_reader.go
@@ -199,7 +199,7 @@ type JSONReader struct {
}
// NewJSONReader loads or builds new index-cache.json if not present on disk or object storage.
-func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
+func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
cachefn := filepath.Join(dir, id.String(), block.IndexCacheFilename)
jr, err := newFileJSONReader(logger, cachefn)
if err == nil {
@@ -216,7 +216,7 @@ func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketRe
}
// Try to download index cache file from object store.
- if err = objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
+ if err = objstore.DownloadFile(ctx, logger, bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr), filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
return newFileJSONReader(logger, cachefn)
}
diff --git a/pkg/block/metadata/deletionmark.go b/pkg/block/metadata/deletionmark.go
index bda97e92c6c..5f2a9f04adc 100644
--- a/pkg/block/metadata/deletionmark.go
+++ b/pkg/block/metadata/deletionmark.go
@@ -45,10 +45,10 @@ type DeletionMark struct {
}
// ReadDeletionMark reads the given deletion mark file from
/deletion-mark.json in bucket.
-func ReadDeletionMark(ctx context.Context, bkt objstore.BucketReader, logger log.Logger, dir string) (*DeletionMark, error) {
+func ReadDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, dir string) (*DeletionMark, error) {
deletionMarkFile := path.Join(dir, DeletionMarkFilename)
- r, err := bkt.Get(ctx, deletionMarkFile)
+ r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, deletionMarkFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil, ErrorDeletionMarkNotFound
diff --git a/pkg/block/metadata/deletionmark_test.go b/pkg/block/metadata/deletionmark_test.go
index 73f632e9af4..4afcfd945af 100644
--- a/pkg/block/metadata/deletionmark_test.go
+++ b/pkg/block/metadata/deletionmark_test.go
@@ -16,7 +16,7 @@ import (
"github.com/fortytw2/leaktest"
"github.com/oklog/ulid"
"github.com/pkg/errors"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
+ "github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)
@@ -29,7 +29,7 @@ func TestReadDeletionMark(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()
- bkt := inmem.NewBucket()
+ bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
{
blockWithoutDeletionMark := ulid.MustNew(uint64(1), nil)
_, err := ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithoutDeletionMark.String()))
diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go
index 9a7889ead8e..cd05988e1e8 100644
--- a/pkg/compact/clean_test.go
+++ b/pkg/compact/clean_test.go
@@ -18,7 +18,7 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
+ "github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)
@@ -26,7 +26,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- bkt := inmem.NewBucket()
+ bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
logger := log.NewNopLogger()
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go
index 21de5cfca76..3f98faea72e 100644
--- a/pkg/compact/compact_e2e_test.go
+++ b/pkg/compact/compact_e2e_test.go
@@ -90,7 +90,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}
duplicateBlocksFilter := block.NewDeduplicateFilter()
- metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{
+ metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.NoopInstrumentedBucket{bkt}, "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
}, nil)
testutil.Ok(t, err)
@@ -176,9 +176,9 @@ func TestGroup_Compact_e2e(t *testing.T) {
reg := prometheus.NewRegistry()
- ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 48*time.Hour)
+ ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.NoopInstrumentedBucket{bkt}, 48*time.Hour)
duplicateBlocksFilter := block.NewDeduplicateFilter()
- metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{
+ metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.NoopInstrumentedBucket{bkt}, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, nil)
diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go
index 0a1431996e8..4aeaeea6746 100644
--- a/pkg/compact/retention_test.go
+++ b/pkg/compact/retention_test.go
@@ -22,7 +22,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)
@@ -240,7 +239,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
- bkt := inmem.NewBucket()
+ bkt := objstore.NoopInstrumentedBucket{objstore.NewInMemBucket()}
for _, b := range tt.blocks {
uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
}
diff --git a/pkg/objstore/azure/azure.go b/pkg/objstore/azure/azure.go
index 0c132daf61b..97603abdf18 100644
--- a/pkg/objstore/azure/azure.go
+++ b/pkg/objstore/azure/azure.go
@@ -64,7 +64,7 @@ func (conf *Config) validate() error {
return nil
}
-// NewBucket returns a new Bucket using the provided Azure config.
+// NewInMemBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
diff --git a/pkg/objstore/client/factory.go b/pkg/objstore/client/factory.go
index 6caf7f8ee43..687cf61ec46 100644
--- a/pkg/objstore/client/factory.go
+++ b/pkg/objstore/client/factory.go
@@ -40,9 +40,9 @@ type BucketConfig struct {
Config interface{} `yaml:"config"`
}
-// NewBucket initializes and returns new object storage clients.
+// NewInMemBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
-func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.Bucket, error) {
+func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.InstrumentedBucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
diff --git a/pkg/objstore/filesystem/filesystem.go b/pkg/objstore/filesystem/filesystem.go
index 0a13994d938..6965ee64494 100644
--- a/pkg/objstore/filesystem/filesystem.go
+++ b/pkg/objstore/filesystem/filesystem.go
@@ -43,7 +43,7 @@ func NewBucketFromConfig(conf []byte) (*Bucket, error) {
return NewBucket(c.Directory)
}
-// NewBucket returns a new filesystem.Bucket.
+// NewInMemBucket returns a new filesystem.Bucket.
func NewBucket(rootDir string) (*Bucket, error) {
absDir, err := filepath.Abs(rootDir)
if err != nil {
diff --git a/pkg/objstore/gcs/gcs.go b/pkg/objstore/gcs/gcs.go
index ae2cf3202bb..e23a59077c5 100644
--- a/pkg/objstore/gcs/gcs.go
+++ b/pkg/objstore/gcs/gcs.go
@@ -41,7 +41,7 @@ type Bucket struct {
closer io.Closer
}
-// NewBucket returns a new Bucket against the given bucket handle.
+// NewInMemBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
var gc Config
if err := yaml.Unmarshal(conf, &gc); err != nil {
diff --git a/pkg/objstore/inmem/inmem.go b/pkg/objstore/inmem.go
similarity index 72%
rename from pkg/objstore/inmem/inmem.go
rename to pkg/objstore/inmem.go
index 7f430c3c97b..68f1c1cd199 100644
--- a/pkg/objstore/inmem/inmem.go
+++ b/pkg/objstore/inmem.go
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
-package inmem
+package objstore
import (
"bytes"
@@ -13,37 +13,36 @@ import (
"sync"
"github.com/pkg/errors"
- "github.com/thanos-io/thanos/pkg/objstore"
)
var errNotFound = errors.New("inmem: object not found")
// Bucket implements the objstore.Bucket interfaces against local memory.
// Methods from Bucket interface are thread-safe. Objects are assumed to be immutable.
-type Bucket struct {
+type InMemBucket struct {
mtx sync.RWMutex
objects map[string][]byte
}
-// NewBucket returns a new in memory Bucket.
+// NewInMemBucket returns a new in memory Bucket.
// NOTE: Returned bucket is just a naive in memory bucket implementation. For test use cases only.
-func NewBucket() *Bucket {
- return &Bucket{objects: map[string][]byte{}}
+func NewInMemBucket() *InMemBucket {
+ return &InMemBucket{objects: map[string][]byte{}}
}
// Objects returns internally stored objects.
// NOTE: For assert purposes.
-func (b *Bucket) Objects() map[string][]byte {
+func (b *InMemBucket) Objects() map[string][]byte {
return b.objects
}
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
-func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error {
+func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error) error {
unique := map[string]struct{}{}
var dirPartsCount int
- dirParts := strings.SplitAfter(dir, objstore.DirDelim)
+ dirParts := strings.SplitAfter(dir, DirDelim)
for _, p := range dirParts {
if p == "" {
continue
@@ -57,7 +56,7 @@ func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error
continue
}
- parts := strings.SplitAfter(filename, objstore.DirDelim)
+ parts := strings.SplitAfter(filename, DirDelim)
unique[strings.Join(parts[:dirPartsCount+1], "")] = struct{}{}
}
b.mtx.RUnlock()
@@ -67,13 +66,13 @@ func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error
keys = append(keys, n)
}
sort.Slice(keys, func(i, j int) bool {
- if strings.HasSuffix(keys[i], objstore.DirDelim) && strings.HasSuffix(keys[j], objstore.DirDelim) {
+ if strings.HasSuffix(keys[i], DirDelim) && strings.HasSuffix(keys[j], DirDelim) {
return strings.Compare(keys[i], keys[j]) < 0
}
- if strings.HasSuffix(keys[i], objstore.DirDelim) {
+ if strings.HasSuffix(keys[i], DirDelim) {
return false
}
- if strings.HasSuffix(keys[j], objstore.DirDelim) {
+ if strings.HasSuffix(keys[j], DirDelim) {
return true
}
@@ -89,7 +88,7 @@ func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error
}
// Get returns a reader for the given object name.
-func (b *Bucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
+func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
if name == "" {
return nil, errors.New("inmem: object name is empty")
}
@@ -105,7 +104,7 @@ func (b *Bucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
}
// GetRange returns a new range reader for the given object name and range.
-func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
+func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
if name == "" {
return nil, errors.New("inmem: object name is empty")
}
@@ -138,7 +137,7 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io
}
// Exists checks if the given directory exists in memory.
-func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
+func (b *InMemBucket) Exists(_ context.Context, name string) (bool, error) {
b.mtx.RLock()
defer b.mtx.RUnlock()
_, ok := b.objects[name]
@@ -146,7 +145,7 @@ func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
}
// ObjectSize returns the size of the specified object.
-func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
+func (b *InMemBucket) ObjectSize(_ context.Context, name string) (uint64, error) {
b.mtx.RLock()
file, ok := b.objects[name]
b.mtx.RUnlock()
@@ -157,7 +156,7 @@ func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
}
// Upload writes the file specified in src to into the memory.
-func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
+func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error {
b.mtx.Lock()
defer b.mtx.Unlock()
body, err := ioutil.ReadAll(r)
@@ -169,7 +168,7 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
}
// Delete removes all data prefixed with the dir.
-func (b *Bucket) Delete(_ context.Context, name string) error {
+func (b *InMemBucket) Delete(_ context.Context, name string) error {
b.mtx.Lock()
defer b.mtx.Unlock()
if _, ok := b.objects[name]; !ok {
@@ -180,13 +179,13 @@ func (b *Bucket) Delete(_ context.Context, name string) error {
}
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
-func (b *Bucket) IsObjNotFoundErr(err error) bool {
- return err == errNotFound
+func (b *InMemBucket) IsObjNotFoundErr(err error) bool {
+ return errors.Cause(err) == errNotFound
}
-func (b *Bucket) Close() error { return nil }
+func (b *InMemBucket) Close() error { return nil }
// Name returns the bucket name.
-func (b *Bucket) Name() string {
+func (b *InMemBucket) Name() string {
return "inmem"
}
diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go
index 23926054ad2..c1bb44d0792 100644
--- a/pkg/objstore/objstore.go
+++ b/pkg/objstore/objstore.go
@@ -6,13 +6,10 @@ package objstore
import (
"bytes"
"context"
- "fmt"
"io"
- "math/rand"
"os"
"path/filepath"
"strings"
- "testing"
"time"
"github.com/go-kit/kit/log"
@@ -41,22 +38,18 @@ type Bucket interface {
Name() string
}
-// TryToGetSize tries to get upfront size from reader.
-// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
-func TryToGetSize(r io.Reader) (int64, error) {
- switch f := r.(type) {
- case *os.File:
- fileInfo, err := f.Stat()
- if err != nil {
- return 0, errors.Wrap(err, "os.File.Stat()")
- }
- return fileInfo.Size(), nil
- case *bytes.Buffer:
- return int64(f.Len()), nil
- case *strings.Reader:
- return f.Size(), nil
- }
- return 0, errors.New("unsupported type of io.Reader")
+// InstrumentedBucket is a Bucket with optional instrumentation control on reader.
+type InstrumentedBucket interface {
+ Bucket
+
+ // WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
+ // thanos_objstore_bucket_operation_failures_total metric.
+ WithExpectedErrs(IsOpFailureExpectedFunc) Bucket
+
+ // ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
+ // thanos_objstore_bucket_operation_failures_total metric.
+ // TODO(bwplotka): Remove this when moved to Go 1.14 and replace with InstrumentedBucketReader.
+ ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}
// BucketReader provides read access to an object storage bucket.
@@ -81,6 +74,33 @@ type BucketReader interface {
ObjectSize(ctx context.Context, name string) (uint64, error)
}
+// InstrumentedBucket is a BucketReader with optional instrumentation control.
+type InstrumentedBucketReader interface {
+ BucketReader
+
+ // ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
+ // thanos_objstore_bucket_operation_failures_total metric.
+ ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
+}
+
+// TryToGetSize tries to get upfront size from reader.
+// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
+func TryToGetSize(r io.Reader) (int64, error) {
+ switch f := r.(type) {
+ case *os.File:
+ fileInfo, err := f.Stat()
+ if err != nil {
+ return 0, errors.Wrap(err, "os.File.Stat()")
+ }
+ return fileInfo.Size(), nil
+ case *bytes.Buffer:
+ return int64(f.Len()), nil
+ case *strings.Reader:
+ return f.Size(), nil
+ }
+ return 0, errors.New("unsupported type of io.Reader")
+}
+
// UploadDir uploads all files in srcdir to the bucket with into a top-level directory
// named dstdir. It is a caller responsibility to clean partial upload in case of failure.
func UploadDir(ctx context.Context, logger log.Logger, bkt Bucket, srcdir, dstdir string) error {
@@ -200,27 +220,32 @@ const (
deleteOp = "delete"
)
+// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment thanos_objstore_bucket_operation_failures_total metric.
+type IsOpFailureExpectedFunc func(error) bool
+
+var _ InstrumentedBucket = &metricBucket{}
+
// BucketWithMetrics takes a bucket and registers metrics with the given registry for
// operations run against the bucket.
-func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) Bucket {
+func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metricBucket {
bkt := &metricBucket{
- bkt: b,
-
+ bkt: b,
+ isOpFailureExpected: func(err error) bool { return false },
ops: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_objstore_bucket_operations_total",
- Help: "Total number of operations against a bucket.",
+ Help: "Total number of all attempted operations against a bucket.",
ConstLabels: prometheus.Labels{"bucket": name},
}, []string{"operation"}),
opsFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_objstore_bucket_operation_failures_total",
- Help: "Total number of operations against a bucket that failed.",
+ Help: "Total number of operations against a bucket that failed, but were not expected to fail in certain way from caller perspective. Those errors have to be investigated.",
ConstLabels: prometheus.Labels{"bucket": name},
}, []string{"operation"}),
opsDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_objstore_bucket_operation_duration_seconds",
- Help: "Duration of operations against the bucket",
+ Help: "Duration of successful operations against the bucket",
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"operation"}),
@@ -229,7 +254,15 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) Bucket
Help: "Second timestamp of the last successful upload to the bucket.",
}, []string{"bucket"}),
}
- for _, op := range []string{iterOp, sizeOp, getOp, getRangeOp, existsOp, uploadOp, deleteOp} {
+ for _, op := range []string{
+ iterOp,
+ sizeOp,
+ getOp,
+ getRangeOp,
+ existsOp,
+ uploadOp,
+ deleteOp,
+ } {
bkt.ops.WithLabelValues(op)
bkt.opsFailures.WithLabelValues(op)
bkt.opsDuration.WithLabelValues(op)
@@ -241,107 +274,142 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) Bucket
type metricBucket struct {
bkt Bucket
- ops *prometheus.CounterVec
- opsFailures *prometheus.CounterVec
+ ops *prometheus.CounterVec
+ opsFailures *prometheus.CounterVec
+ isOpFailureExpected IsOpFailureExpectedFunc
+
opsDuration *prometheus.HistogramVec
lastSuccessfulUploadTime *prometheus.GaugeVec
}
+func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket {
+ return &metricBucket{
+ bkt: b.bkt,
+ ops: b.ops,
+ opsFailures: b.opsFailures,
+ isOpFailureExpected: fn,
+ opsDuration: b.opsDuration,
+ lastSuccessfulUploadTime: b.lastSuccessfulUploadTime,
+ }
+}
+
+func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) BucketReader {
+ return b.WithExpectedErrs(fn)
+}
+
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error {
+ const op = iterOp
+ b.ops.WithLabelValues(op).Inc()
+
err := b.bkt.Iter(ctx, dir, f)
- if err != nil {
- b.opsFailures.WithLabelValues(iterOp).Inc()
+ if err != nil && !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
}
- b.ops.WithLabelValues(iterOp).Inc()
-
return err
}
-// ObjectSize returns the size of the specified object.
func (b *metricBucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
- b.ops.WithLabelValues(sizeOp).Inc()
- start := time.Now()
+ const op = sizeOp
+ b.ops.WithLabelValues(op).Inc()
+ start := time.Now()
rc, err := b.bkt.ObjectSize(ctx, name)
if err != nil {
- b.opsFailures.WithLabelValues(sizeOp).Inc()
+ if !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
+ }
return 0, err
}
- b.opsDuration.WithLabelValues(sizeOp).Observe(time.Since(start).Seconds())
+ b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
return rc, nil
}
func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
- b.ops.WithLabelValues(getOp).Inc()
+ const op = getOp
+ b.ops.WithLabelValues(op).Inc()
rc, err := b.bkt.Get(ctx, name)
if err != nil {
- b.opsFailures.WithLabelValues(getOp).Inc()
+ if !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
+ }
return nil, err
}
return newTimingReadCloser(
rc,
- getOp,
+ op,
b.opsDuration,
b.opsFailures,
+ b.isOpFailureExpected,
), nil
}
func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
- b.ops.WithLabelValues(getRangeOp).Inc()
+ const op = getRangeOp
+ b.ops.WithLabelValues(op).Inc()
rc, err := b.bkt.GetRange(ctx, name, off, length)
if err != nil {
- b.opsFailures.WithLabelValues(getRangeOp).Inc()
+ if !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
+ }
return nil, err
}
return newTimingReadCloser(
rc,
- getRangeOp,
+ op,
b.opsDuration,
b.opsFailures,
+ b.isOpFailureExpected,
), nil
}
func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
- start := time.Now()
+ const op = existsOp
+ b.ops.WithLabelValues(op).Inc()
+ start := time.Now()
ok, err := b.bkt.Exists(ctx, name)
if err != nil {
- b.opsFailures.WithLabelValues(existsOp).Inc()
+ if !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
+ }
+ return false, err
}
- b.ops.WithLabelValues(existsOp).Inc()
- b.opsDuration.WithLabelValues(existsOp).Observe(time.Since(start).Seconds())
-
- return ok, err
+ b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
+ return ok, nil
}
func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
- start := time.Now()
+ const op = uploadOp
+ b.ops.WithLabelValues(op).Inc()
- err := b.bkt.Upload(ctx, name, r)
- if err != nil {
- b.opsFailures.WithLabelValues(uploadOp).Inc()
- } else {
- b.lastSuccessfulUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime()
+ start := time.Now()
+ if err := b.bkt.Upload(ctx, name, r); err != nil {
+ if !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
+ }
+ return err
}
- b.ops.WithLabelValues(uploadOp).Inc()
- b.opsDuration.WithLabelValues(uploadOp).Observe(time.Since(start).Seconds())
-
- return err
+ b.lastSuccessfulUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime()
+ b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
+ return nil
}
func (b *metricBucket) Delete(ctx context.Context, name string) error {
- start := time.Now()
+ const op = deleteOp
+ b.ops.WithLabelValues(op).Inc()
- err := b.bkt.Delete(ctx, name)
- if err != nil {
- b.opsFailures.WithLabelValues(deleteOp).Inc()
+ start := time.Now()
+ if err := b.bkt.Delete(ctx, name); err != nil {
+ if !b.isOpFailureExpected(err) {
+ b.opsFailures.WithLabelValues(op).Inc()
+ }
+ return err
}
- b.ops.WithLabelValues(deleteOp).Inc()
- b.opsDuration.WithLabelValues(deleteOp).Observe(time.Since(start).Seconds())
+ b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
- return err
+ return nil
}
func (b *metricBucket) IsObjNotFoundErr(err error) bool {
@@ -359,53 +427,49 @@ func (b *metricBucket) Name() string {
type timingReadCloser struct {
io.ReadCloser
- ok bool
- start time.Time
- op string
- duration *prometheus.HistogramVec
- failed *prometheus.CounterVec
+ alreadyGotErr bool
+
+ start time.Time
+ op string
+ duration *prometheus.HistogramVec
+ failed *prometheus.CounterVec
+ isFailureExpected IsOpFailureExpectedFunc
}
-func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec) *timingReadCloser {
+func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc) *timingReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
return &timingReadCloser{
- ReadCloser: rc,
- ok: true,
- start: time.Now(),
- op: op,
- duration: dur,
- failed: failed,
+ ReadCloser: rc,
+ start: time.Now(),
+ op: op,
+ duration: dur,
+ failed: failed,
+ isFailureExpected: isFailureExpected,
}
}
func (rc *timingReadCloser) Close() error {
err := rc.ReadCloser.Close()
- rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds())
- if rc.ok && err != nil {
+ if !rc.alreadyGotErr && err != nil {
rc.failed.WithLabelValues(rc.op).Inc()
- rc.ok = false
+ }
+ if !rc.alreadyGotErr && err == nil {
+ rc.duration.WithLabelValues(rc.op).Observe(time.Since(rc.start).Seconds())
+ rc.alreadyGotErr = true
}
return err
}
func (rc *timingReadCloser) Read(b []byte) (n int, err error) {
n, err = rc.ReadCloser.Read(b)
- if rc.ok && err != nil && err != io.EOF {
- rc.failed.WithLabelValues(rc.op).Inc()
- rc.ok = false
+ // Report metric just once.
+ if !rc.alreadyGotErr && err != nil && err != io.EOF {
+ if !rc.isFailureExpected(err) {
+ rc.failed.WithLabelValues(rc.op).Inc()
+ }
+ rc.alreadyGotErr = true
}
return n, err
}
-
-func CreateTemporaryTestBucketName(t testing.TB) string {
- src := rand.NewSource(time.Now().UnixNano())
-
- // Bucket name need to conform: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html.
- name := strings.Replace(strings.Replace(fmt.Sprintf("test_%x_%s", src.Int63(), strings.ToLower(t.Name())), "_", "-", -1), "/", "-", -1)
- if len(name) >= 63 {
- name = name[:63]
- }
- return name
-}
diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go
new file mode 100644
index 00000000000..173bf3d6999
--- /dev/null
+++ b/pkg/objstore/objstore_test.go
@@ -0,0 +1,61 @@
+package objstore
+
+import (
+ "testing"
+
+ promtest "github.com/prometheus/client_golang/prometheus/testutil"
+ "github.com/thanos-io/thanos/pkg/testutil"
+)
+
+func TestMetricBucket_Close(t *testing.T) {
+ bkt := BucketWithMetrics("abc", NewInMemBucket(), nil)
+ // Expected initialized metrics
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
+
+ AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr))
+ testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp)))
+ testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(sizeOp)))
+ testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp)))
+ testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp)))
+ testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp)))
+ testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp)))
+ testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp)))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(sizeOp)))
+ testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp)))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
+ lastUpload := promtest.ToFloat64(bkt.lastSuccessfulUploadTime)
+ testutil.Assert(t, lastUpload > 0, "last upload not greater than 0, val: %f", lastUpload)
+
+ // Clear bucket, but don't clear metrics to ensure we use same.
+ bkt.bkt = NewInMemBucket()
+ AcceptanceTest(t, bkt)
+ testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp)))
+ testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(sizeOp)))
+ testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp)))
+ testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp)))
+ testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp)))
+ testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp)))
+ testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp)))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp)))
+ // Not expected not found error here.
+ testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(sizeOp)))
+ // Not expected not found errors, this should increment failure metric on get for not found as well, so +2.
+ testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp)))
+ testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp)))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
+ testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
+ testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload)
+}
diff --git a/pkg/objstore/objtesting/acceptance_e2e_test.go b/pkg/objstore/objtesting/acceptance_e2e_test.go
index 86045424eea..4cfda44c337 100644
--- a/pkg/objstore/objtesting/acceptance_e2e_test.go
+++ b/pkg/objstore/objtesting/acceptance_e2e_test.go
@@ -4,14 +4,9 @@
package objtesting
import (
- "context"
- "io/ioutil"
- "sort"
- "strings"
"testing"
"github.com/thanos-io/thanos/pkg/objstore"
- "github.com/thanos-io/thanos/pkg/testutil"
)
// TestObjStoreAcceptanceTest_e2e tests all known implementation against interface behaviour contract we agreed on.
@@ -19,142 +14,5 @@ import (
// NOTE: This test assumes strong consistency, but in the same way it does not guarantee that if it passes, the
// used object store is strongly consistent.
func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
- ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
- ctx := context.Background()
-
- _, err := bkt.Get(ctx, "")
- testutil.NotOk(t, err)
- testutil.Assert(t, !bkt.IsObjNotFoundErr(err), "expected user error got not found %s", err)
-
- _, err = bkt.Get(ctx, "id1/obj_1.some")
- testutil.NotOk(t, err)
- testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error got %s", err)
-
- ok, err := bkt.Exists(ctx, "id1/obj_1.some")
- testutil.Ok(t, err)
- testutil.Assert(t, !ok, "expected not exits")
-
- _, err = bkt.ObjectSize(ctx, "id1/obj_1.some")
- testutil.NotOk(t, err)
- testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err)
-
- // Upload first object.
- testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@")))
-
- // Double check we can immediately read it.
- rc1, err := bkt.Get(ctx, "id1/obj_1.some")
- testutil.Ok(t, err)
- defer func() { testutil.Ok(t, rc1.Close()) }()
- content, err := ioutil.ReadAll(rc1)
- testutil.Ok(t, err)
- testutil.Equals(t, "@test-data@", string(content))
-
- // Check if we can get the correct size.
- sz, err := bkt.ObjectSize(ctx, "id1/obj_1.some")
- testutil.Ok(t, err)
- testutil.Assert(t, sz == 11, "expected size to be equal to 11")
-
- rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3)
- testutil.Ok(t, err)
- defer func() { testutil.Ok(t, rc2.Close()) }()
- content, err = ioutil.ReadAll(rc2)
- testutil.Ok(t, err)
- testutil.Equals(t, "tes", string(content))
-
- // Unspecified range with offset.
- rcUnspecifiedLen, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, -1)
- testutil.Ok(t, err)
- defer func() { testutil.Ok(t, rcUnspecifiedLen.Close()) }()
- content, err = ioutil.ReadAll(rcUnspecifiedLen)
- testutil.Ok(t, err)
- testutil.Equals(t, "test-data@", string(content))
-
- // Out of band offset. Do not rely on outcome.
- // NOTE: For various providers we have different outcome.
- // * GCS is giving 416 status code
- // * S3 errors immdiately with invalid range error.
- // * inmem and filesystem are returning 0 bytes.
- //rcOffset, err := bkt.GetRange(ctx, "id1/obj_1.some", 124141, 3)
-
- // Out of band length. We expect to read file fully.
- rcLength, err := bkt.GetRange(ctx, "id1/obj_1.some", 3, 9999)
- testutil.Ok(t, err)
- defer func() { testutil.Ok(t, rcLength.Close()) }()
- content, err = ioutil.ReadAll(rcLength)
- testutil.Ok(t, err)
- testutil.Equals(t, "st-data@", string(content))
-
- ok, err = bkt.Exists(ctx, "id1/obj_1.some")
- testutil.Ok(t, err)
- testutil.Assert(t, ok, "expected exits")
-
- // Upload other objects.
- testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")))
- // Upload should be idempotent.
- testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")))
- testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@")))
- testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data4@")))
- testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data5@")))
-
- // Can we iter over items from top dir?
- var seen []string
- testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error {
- seen = append(seen, fn)
- return nil
- }))
- expected := []string{"obj_5.some", "id1/", "id2/"}
- sort.Strings(expected)
- sort.Strings(seen)
- testutil.Equals(t, expected, seen)
-
- // Can we iter over items from id1/ dir?
- seen = []string{}
- testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error {
- seen = append(seen, fn)
- return nil
- }))
- testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen)
-
- // Can we iter over items from id1 dir?
- seen = []string{}
- testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error {
- seen = append(seen, fn)
- return nil
- }))
- testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen)
-
- // Can we iter over items from not existing dir?
- testutil.Ok(t, bkt.Iter(ctx, "id0", func(fn string) error {
- t.Error("Not expected to loop through not existing directory")
- t.FailNow()
-
- return nil
- }))
-
- testutil.Ok(t, bkt.Delete(ctx, "id1/obj_2.some"))
-
- // Delete is expected to fail on non existing object.
- // NOTE: Don't rely on this. S3 is not complying with this as GCS is.
- // testutil.NotOk(t, bkt.Delete(ctx, "id1/obj_2.some"))
-
- // Can we iter over items from id1/ dir and see obj2 being deleted?
- seen = []string{}
- testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error {
- seen = append(seen, fn)
- return nil
- }))
- testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_3.some"}, seen)
-
- testutil.Ok(t, bkt.Delete(ctx, "id2/obj_4.some"))
-
- seen = []string{}
- testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error {
- seen = append(seen, fn)
- return nil
- }))
- expected = []string{"obj_5.some", "id1/"}
- sort.Strings(expected)
- sort.Strings(seen)
- testutil.Equals(t, expected, seen)
- })
+ ForeachStore(t, objstore.AcceptanceTest)
}
diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go
index 97b0490977c..e0c28cb4f6d 100644
--- a/pkg/objstore/objtesting/foreach.go
+++ b/pkg/objstore/objtesting/foreach.go
@@ -16,7 +16,6 @@ import (
"github.com/thanos-io/thanos/pkg/objstore/azure"
"github.com/thanos-io/thanos/pkg/objstore/cos"
"github.com/thanos-io/thanos/pkg/objstore/gcs"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore/oss"
"github.com/thanos-io/thanos/pkg/objstore/s3"
"github.com/thanos-io/thanos/pkg/objstore/swift"
@@ -48,7 +47,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
// Mandatory Inmem. Not parallel, to detect problem early.
if ok := t.Run("inmem", func(t *testing.T) {
- testFn(t, inmem.NewBucket())
+ testFn(t, objstore.NewInMemBucket())
}); !ok {
return
}
diff --git a/pkg/objstore/oss/oss.go b/pkg/objstore/oss/oss.go
index 5c081d7a5a1..8a8d72086c9 100644
--- a/pkg/objstore/oss/oss.go
+++ b/pkg/objstore/oss/oss.go
@@ -150,7 +150,7 @@ func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
return 0, errors.New("content-length header not found")
}
-// NewBucket returns a new Bucket using the provided oss config values.
+// NewInMemBucket returns a new Bucket using the provided oss config values.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
var config Config
if err := yaml.Unmarshal(conf, &config); err != nil {
diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go
index 285bebaf7d1..0d87f2cb1e5 100644
--- a/pkg/objstore/s3/s3.go
+++ b/pkg/objstore/s3/s3.go
@@ -93,7 +93,7 @@ func parseConfig(conf []byte) (Config, error) {
return config, nil
}
-// NewBucket returns a new Bucket using the provided s3 config values.
+// NewInMemBucket returns a new Bucket using the provided s3 config values.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
diff --git a/pkg/objstore/testing.go b/pkg/objstore/testing.go
index 9e9a864b915..e7886ea7207 100644
--- a/pkg/objstore/testing.go
+++ b/pkg/objstore/testing.go
@@ -5,11 +5,29 @@ package objstore
import (
"context"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "sort"
"strings"
"sync"
"testing"
+ "time"
+
+ "github.com/thanos-io/thanos/pkg/testutil"
)
+func CreateTemporaryTestBucketName(t testing.TB) string {
+ src := rand.NewSource(time.Now().UnixNano())
+
+ // Bucket name need to conform: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html.
+ name := strings.Replace(strings.Replace(fmt.Sprintf("test_%x_%s", src.Int63(), strings.ToLower(t.Name())), "_", "-", -1), "/", "-", -1)
+ if len(name) >= 63 {
+ name = name[:63]
+ }
+ return name
+}
+
// EmptyBucket deletes all objects from bucket. This operation is required to properly delete bucket as a whole.
// It is used for testing only.
// TODO(bplotka): Add retries.
@@ -44,3 +62,154 @@ func EmptyBucket(t testing.TB, ctx context.Context, bkt Bucket) {
}
wg.Wait()
}
+
+type NoopInstrumentedBucket struct {
+ Bucket
+}
+
+func (b NoopInstrumentedBucket) WithExpectedErrs(IsOpFailureExpectedFunc) Bucket {
+ return b
+}
+
+func (b NoopInstrumentedBucket) ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader {
+ return b
+}
+
+func AcceptanceTest(t *testing.T, bkt Bucket) {
+ ctx := context.Background()
+
+ _, err := bkt.Get(ctx, "")
+ testutil.NotOk(t, err)
+ testutil.Assert(t, !bkt.IsObjNotFoundErr(err), "expected user error got not found %s", err)
+
+ _, err = bkt.Get(ctx, "id1/obj_1.some")
+ testutil.NotOk(t, err)
+ testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error got %s", err)
+
+ ok, err := bkt.Exists(ctx, "id1/obj_1.some")
+ testutil.Ok(t, err)
+ testutil.Assert(t, !ok, "expected not exits")
+
+ _, err = bkt.ObjectSize(ctx, "id1/obj_1.some")
+ testutil.NotOk(t, err)
+ testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err)
+
+ // Upload first object.
+ testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@")))
+
+ // Double check we can immediately read it.
+ rc1, err := bkt.Get(ctx, "id1/obj_1.some")
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, rc1.Close()) }()
+ content, err := ioutil.ReadAll(rc1)
+ testutil.Ok(t, err)
+ testutil.Equals(t, "@test-data@", string(content))
+
+ // Check if we can get the correct size.
+ sz, err := bkt.ObjectSize(ctx, "id1/obj_1.some")
+ testutil.Ok(t, err)
+ testutil.Assert(t, sz == 11, "expected size to be equal to 11")
+
+ rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3)
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, rc2.Close()) }()
+ content, err = ioutil.ReadAll(rc2)
+ testutil.Ok(t, err)
+ testutil.Equals(t, "tes", string(content))
+
+ // Unspecified range with offset.
+ rcUnspecifiedLen, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, -1)
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, rcUnspecifiedLen.Close()) }()
+ content, err = ioutil.ReadAll(rcUnspecifiedLen)
+ testutil.Ok(t, err)
+ testutil.Equals(t, "test-data@", string(content))
+
+ // Out of band offset. Do not rely on outcome.
+ // NOTE: For various providers we have different outcome.
+ // * GCS is giving 416 status code
+ // * S3 errors immdiately with invalid range error.
+ // * inmem and filesystem are returning 0 bytes.
+ //rcOffset, err := bkt.GetRange(ctx, "id1/obj_1.some", 124141, 3)
+
+ // Out of band length. We expect to read file fully.
+ rcLength, err := bkt.GetRange(ctx, "id1/obj_1.some", 3, 9999)
+ testutil.Ok(t, err)
+ defer func() { testutil.Ok(t, rcLength.Close()) }()
+ content, err = ioutil.ReadAll(rcLength)
+ testutil.Ok(t, err)
+ testutil.Equals(t, "st-data@", string(content))
+
+ ok, err = bkt.Exists(ctx, "id1/obj_1.some")
+ testutil.Ok(t, err)
+ testutil.Assert(t, ok, "expected exits")
+
+ // Upload other objects.
+ testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")))
+ // Upload should be idempotent.
+ testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")))
+ testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@")))
+ testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data4@")))
+ testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data5@")))
+
+ // Can we iter over items from top dir?
+ var seen []string
+ testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error {
+ seen = append(seen, fn)
+ return nil
+ }))
+ expected := []string{"obj_5.some", "id1/", "id2/"}
+ sort.Strings(expected)
+ sort.Strings(seen)
+ testutil.Equals(t, expected, seen)
+
+ // Can we iter over items from id1/ dir?
+ seen = []string{}
+ testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error {
+ seen = append(seen, fn)
+ return nil
+ }))
+ testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen)
+
+ // Can we iter over items from id1 dir?
+ seen = []string{}
+ testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error {
+ seen = append(seen, fn)
+ return nil
+ }))
+ testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen)
+
+ // Can we iter over items from not existing dir?
+ testutil.Ok(t, bkt.Iter(ctx, "id0", func(fn string) error {
+ t.Error("Not expected to loop through not existing directory")
+ t.FailNow()
+
+ return nil
+ }))
+
+ testutil.Ok(t, bkt.Delete(ctx, "id1/obj_2.some"))
+
+ // Delete is expected to fail on non existing object.
+ // NOTE: Don't rely on this. S3 is not complying with this as GCS is.
+ // testutil.NotOk(t, bkt.Delete(ctx, "id1/obj_2.some"))
+
+ // Can we iter over items from id1/ dir and see obj2 being deleted?
+ seen = []string{}
+ testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error {
+ seen = append(seen, fn)
+ return nil
+ }))
+ testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_3.some"}, seen)
+
+ testutil.Ok(t, bkt.Delete(ctx, "id2/obj_4.some"))
+
+ seen = []string{}
+ testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error {
+ seen = append(seen, fn)
+ return nil
+ }))
+ expected = []string{"obj_5.some", "id1/"}
+ sort.Strings(expected)
+ sort.Strings(seen)
+ testutil.Equals(t, expected, seen)
+}
diff --git a/pkg/replicate/scheme.go b/pkg/replicate/scheme.go
index ba5fa113ba3..5402dd9a78e 100644
--- a/pkg/replicate/scheme.go
+++ b/pkg/replicate/scheme.go
@@ -101,7 +101,7 @@ type blockFilterFunc func(b *metadata.Meta) bool
// TODO: Add filters field.
type replicationScheme struct {
- fromBkt objstore.BucketReader
+ fromBkt objstore.InstrumentedBucketReader
toBkt objstore.Bucket
blockFilter blockFilterFunc
@@ -158,7 +158,7 @@ func newReplicationScheme(
metrics *replicationMetrics,
blockFilter blockFilterFunc,
fetcher thanosblock.MetadataFetcher,
- from objstore.BucketReader,
+ from objstore.InstrumentedBucketReader,
to objstore.Bucket,
reg prometheus.Registerer,
) *replicationScheme {
@@ -254,7 +254,7 @@ func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id uli
level.Debug(rs.logger).Log("msg", "ensuring block is replicated", "block_uuid", blockID)
- originMetaFile, err := rs.fromBkt.Get(ctx, metaFile)
+ originMetaFile, err := rs.fromBkt.ReaderWithExpectedErrs(rs.fromBkt.IsObjNotFoundErr).Get(ctx, metaFile)
if err != nil {
return errors.Wrap(err, "get meta file from origin bucket")
}
diff --git a/pkg/replicate/scheme_test.go b/pkg/replicate/scheme_test.go
index 26c83aafb64..58cd3a9b1f9 100644
--- a/pkg/replicate/scheme_test.go
+++ b/pkg/replicate/scheme_test.go
@@ -23,7 +23,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)
@@ -66,20 +65,20 @@ func TestReplicationSchemeAll(t *testing.T) {
var cases = []struct {
name string
selector labels.Selector
- prepare func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket)
- assert func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket)
+ prepare func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
+ assert func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket)
}{
{
name: "EmptyOrigin",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {},
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {},
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {},
},
{
name: "NoMeta",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
_ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "chunks", "000001"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
if len(targetBucket.Objects()) != 0 {
t.Fatal("TargetBucket should have been empty but is not.")
}
@@ -87,10 +86,10 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "PartialMeta",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
_ = originBucket.Upload(ctx, path.Join(testULID(0).String(), "meta.json"), bytes.NewReader([]byte("{")))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
if len(targetBucket.Objects()) != 0 {
t.Fatal("TargetBucket should have been empty but is not.")
}
@@ -98,7 +97,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "FullBlock",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
ulid := testULID(0)
meta := testMeta(ulid)
@@ -108,7 +107,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
if len(targetBucket.Objects()) != 3 {
t.Fatal("TargetBucket should have one block made up of three objects replicated.")
}
@@ -116,7 +115,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "PreviousPartialUpload",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
ulid := testULID(0)
meta := testMeta(ulid)
@@ -130,7 +129,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = targetBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = targetBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
for k := range originBucket.Objects() {
if !bytes.Equal(originBucket.Objects()[k], targetBucket.Objects()[k]) {
t.Fatalf("Object %s not equal in origin and target bucket.", k)
@@ -140,7 +139,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "OnlyUploadsRaw",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
ulid := testULID(0)
meta := testMeta(ulid)
@@ -160,7 +159,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
expected := 3
got := len(targetBucket.Objects())
if got != expected {
@@ -170,7 +169,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "UploadMultipleCandidatesWhenPresent",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
ulid := testULID(0)
meta := testMeta(ulid)
@@ -189,7 +188,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
expected := 6
got := len(targetBucket.Objects())
if got != expected {
@@ -199,7 +198,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "LabelSelector",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
ulid := testULID(0)
meta := testMeta(ulid)
@@ -219,7 +218,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
expected := 3
got := len(targetBucket.Objects())
if got != expected {
@@ -229,7 +228,7 @@ func TestReplicationSchemeAll(t *testing.T) {
},
{
name: "NonZeroCompaction",
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
ulid := testULID(0)
meta := testMeta(ulid)
meta.BlockMeta.Compaction.Level = 2
@@ -240,7 +239,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join(ulid.String(), "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
if len(targetBucket.Objects()) != 0 {
t.Fatal("TargetBucket should have been empty but is not.")
}
@@ -249,7 +248,7 @@ func TestReplicationSchemeAll(t *testing.T) {
{
name: "Regression",
selector: labels.Selector{},
- prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket objstore.Bucket) {
+ prepare: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
b := []byte(`{
"ulid": "01DQYXMK8G108CEBQ79Y84DYVY",
"minTime": 1571911200000,
@@ -282,7 +281,7 @@ func TestReplicationSchemeAll(t *testing.T) {
_ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "chunks", "000001"), bytes.NewReader(nil))
_ = originBucket.Upload(ctx, path.Join("01DQYXMK8G108CEBQ79Y84DYVY", "index"), bytes.NewReader(nil))
},
- assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *inmem.Bucket) {
+ assert: func(ctx context.Context, t *testing.T, originBucket, targetBucket *objstore.InMemBucket) {
if len(targetBucket.Objects()) != 3 {
t.Fatal("TargetBucket should have one block does not.")
}
@@ -296,8 +295,8 @@ func TestReplicationSchemeAll(t *testing.T) {
for _, c := range cases {
ctx := context.Background()
- originBucket := inmem.NewBucket()
- targetBucket := inmem.NewBucket()
+ originBucket := objstore.NewInMemBucket()
+ targetBucket := objstore.NewInMemBucket()
logger := testLogger(t.Name() + "/" + c.name)
c.prepare(ctx, t, originBucket, targetBucket)
@@ -313,7 +312,7 @@ func TestReplicationSchemeAll(t *testing.T) {
}
filter := NewBlockFilter(logger, selector, compact.ResolutionLevelRaw, 1).Filter
- fetcher, err := block.NewMetaFetcher(logger, 32, originBucket, "", nil, nil, nil)
+ fetcher, err := block.NewMetaFetcher(logger, 32, objstore.NoopInstrumentedBucket{originBucket}, "", nil, nil, nil)
testutil.Ok(t, err)
r := newReplicationScheme(
@@ -321,7 +320,7 @@ func TestReplicationSchemeAll(t *testing.T) {
newReplicationMetrics(nil),
filter,
fetcher,
- originBucket,
+ objstore.NoopInstrumentedBucket{originBucket},
targetBucket,
nil,
)
diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go
index 19695426f23..b307b165328 100644
--- a/pkg/shipper/shipper_e2e_test.go
+++ b/pkg/shipper/shipper_e2e_test.go
@@ -15,7 +15,6 @@ import (
"testing"
"time"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/go-kit/kit/log"
@@ -180,7 +179,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go
index 81dcb4ff1e2..4d2704b1fea 100644
--- a/pkg/store/bucket.go
+++ b/pkg/store/bucket.go
@@ -221,7 +221,7 @@ type FilterConfig struct {
type BucketStore struct {
logger log.Logger
metrics *bucketStoreMetrics
- bkt objstore.BucketReader
+ bkt objstore.InstrumentedBucketReader
fetcher block.MetadataFetcher
dir string
indexCache storecache.IndexCache
@@ -260,7 +260,7 @@ type BucketStore struct {
func NewBucketStore(
logger log.Logger,
reg prometheus.Registerer,
- bucket objstore.BucketReader,
+ bkt objstore.InstrumentedBucketReader,
fetcher block.MetadataFetcher,
dir string,
indexCache storecache.IndexCache,
@@ -290,7 +290,7 @@ func NewBucketStore(
metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
- bkt: bucket,
+ bkt: bkt,
fetcher: fetcher,
dir: dir,
indexCache: indexCache,
diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go
index 8bcf3216c08..97e6a053b4d 100644
--- a/pkg/store/bucket_e2e_test.go
+++ b/pkg/store/bucket_e2e_test.go
@@ -21,7 +21,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -147,7 +146,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
maxTime: maxTime,
}
- metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, []block.MetadataFilter{
+ metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.NoopInstrumentedBucket{bkt}, dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
}, nil)
@@ -156,7 +155,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
store, err := NewBucketStore(
s.logger,
nil,
- bkt,
+ objstore.NoopInstrumentedBucket{bkt},
metaFetcher,
dir,
s.cache,
@@ -497,7 +496,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) {
func TestBucketStore_TimePartitioning_e2e(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
dir, err := ioutil.TempDir("", "test_bucket_time_part_e2e")
testutil.Ok(t, err)
diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go
index 516bf684c3a..446256fea96 100644
--- a/pkg/store/bucket_test.go
+++ b/pkg/store/bucket_test.go
@@ -43,7 +43,6 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/filesystem"
- "github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/pool"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -526,7 +525,7 @@ func TestBucketStore_Sharding(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")}
id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0)
@@ -711,7 +710,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
testutil.Ok(t, yaml.Unmarshal([]byte(sc.relabel), &relabelConf))
rec := &recorder{Bucket: bkt}
- metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, []block.MetadataFilter{
+ metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.NoopInstrumentedBucket{bkt}, dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConf),
}, nil)
@@ -720,7 +719,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul
bucketStore, err := NewBucketStore(
logger,
nil,
- rec,
+ objstore.NoopInstrumentedBucket{rec},
metaFetcher,
dir,
noopCache{},
@@ -813,7 +812,7 @@ func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []uli
// Regression tests against: https://github.com/thanos-io/thanos/issues/1983.
func TestReadIndexCache_LoadSeries(t *testing.T) {
- bkt := inmem.NewBucket()
+ bkt := objstore.NewInMemBucket()
s := newBucketStoreMetrics(nil)
b := &bucketBlock{
@@ -1310,7 +1309,7 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) {
}
store := &BucketStore{
- bkt: bkt,
+ bkt: objstore.NoopInstrumentedBucket{bkt},
logger: logger,
indexCache: noopCache{},
metrics: newBucketStoreMetrics(nil),
@@ -1551,7 +1550,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
}
store := &BucketStore{
- bkt: bkt,
+ bkt: objstore.NoopInstrumentedBucket{bkt},
logger: logger,
indexCache: indexCache,
metrics: newBucketStoreMetrics(nil),
diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go
index e1ba298d7eb..8b1e8e1d4f1 100644
--- a/pkg/testutil/testutil.go
+++ b/pkg/testutil/testutil.go
@@ -18,6 +18,7 @@ import (
// Assert fails the test if the condition is false.
func Assert(tb testing.TB, condition bool, v ...interface{}) {
+ tb.Helper()
if condition {
return
}
@@ -27,11 +28,12 @@ func Assert(tb testing.TB, condition bool, v ...interface{}) {
if len(v) > 0 {
msg = fmt.Sprintf(v[0].(string), v[1:]...)
}
- tb.Fatalf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
+ tb.Fatalf("\033[31m%s:%d: "+msg+"\033[39m\n\n", filepath.Base(file), line)
}
// Ok fails the test if an err is not nil.
func Ok(tb testing.TB, err error, v ...interface{}) {
+ tb.Helper()
if err == nil {
return
}
@@ -46,6 +48,7 @@ func Ok(tb testing.TB, err error, v ...interface{}) {
// NotOk fails the test if an err is nil.
func NotOk(tb testing.TB, err error, v ...interface{}) {
+ tb.Helper()
if err != nil {
return
}
@@ -60,6 +63,7 @@ func NotOk(tb testing.TB, err error, v ...interface{}) {
// Equals fails the test if exp is not equal to act.
func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) {
+ tb.Helper()
if reflect.DeepEqual(exp, act) {
return
}