diff --git a/CHANGELOG.md b/CHANGELOG.md index eb96112711..9af3bbab52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Fixed - [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`. +- [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files. ## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14 diff --git a/pkg/objstore/azure/azure.go b/pkg/objstore/azure/azure.go index 8f4ee867c3..83f36a81a1 100644 --- a/pkg/objstore/azure/azure.go +++ b/pkg/objstore/azure/azure.go @@ -291,7 +291,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err conf := &Config{ StorageAccountName: os.Getenv("AZURE_STORAGE_ACCOUNT"), StorageAccountKey: os.Getenv("AZURE_STORAGE_ACCESS_KEY"), - ContainerName: "thanos-e2e-test", + ContainerName: objstore.CreateTemporaryTestBucketName(t), } bc, err := yaml.Marshal(conf) diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index d81264ef0e..19fe3ceacc 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -4,19 +4,17 @@ import ( "context" "fmt" "io" - "math/rand" "net/http" "os" "strings" "testing" - "time" "github.com/go-kit/kit/log" - cos "github.com/mozillazg/go-cos" + "github.com/mozillazg/go-cos" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. @@ -311,14 +309,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { t.Log("WARNING. Reusing", c.Bucket, "COS bucket for COS tests. Manual cleanup afterwards is required") return b, func() {}, nil } - - src := rand.NewSource(time.Now().UnixNano()) - - tmpBucketName := strings.Replace(fmt.Sprintf("test_%x", src.Int63()), "_", "-", -1) - if len(tmpBucketName) >= 31 { - tmpBucketName = tmpBucketName[:31] - } - c.Bucket = tmpBucketName + c.Bucket = objstore.CreateTemporaryTestBucketName(t) bc, err := yaml.Marshal(c) if err != nil { @@ -333,12 +324,12 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { if _, err := b.client.Bucket.Put(context.Background(), nil); err != nil { return nil, nil, err } - t.Log("created temporary COS bucket for COS tests with name", tmpBucketName) + t.Log("created temporary COS bucket for COS tests with name", c.Bucket) return b, func() { objstore.EmptyBucket(t, context.Background(), b) if _, err := b.client.Bucket.Delete(context.Background()); err != nil { - t.Logf("deleting bucket %s failed: %s", tmpBucketName, err) + t.Logf("deleting bucket %s failed: %s", c.Bucket, err) } }, nil } diff --git a/pkg/objstore/gcs/gcs.go b/pkg/objstore/gcs/gcs.go index 53baf1e4f7..b3ad6ef350 100644 --- a/pkg/objstore/gcs/gcs.go +++ b/pkg/objstore/gcs/gcs.go @@ -5,11 +5,9 @@ import ( "context" "fmt" "io" - "math/rand" "runtime" "strings" "testing" - "time" "cloud.google.com/go/storage" "github.com/go-kit/kit/log" @@ -19,7 +17,7 @@ import ( "golang.org/x/oauth2/google" "google.golang.org/api/iterator" "google.golang.org/api/option" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. @@ -169,9 +167,8 @@ func (b *Bucket) Close() error { func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - src := rand.NewSource(time.Now().UnixNano()) gTestConfig := Config{ - Bucket: fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), + Bucket: objstore.CreateTemporaryTestBucketName(t), } bc, err := yaml.Marshal(gTestConfig) diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 67a0ce2a12..f231d6ef1c 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -2,10 +2,13 @@ package objstore import ( "context" + "fmt" "io" + "math/rand" "os" "path/filepath" "strings" + "testing" "time" "github.com/go-kit/kit/log" @@ -109,7 +112,7 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, rc, err := bkt.Get(ctx, src) if err != nil { - return errors.Wrap(err, "get file") + return errors.Wrapf(err, "get file %s", src) } defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader") @@ -372,3 +375,14 @@ func (rc *timingReadCloser) Read(b []byte) (n int, err error) { } return n, err } + +func CreateTemporaryTestBucketName(t testing.TB) string { + src := rand.NewSource(time.Now().UnixNano()) + + // Bucket name need to conform: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html. + name := strings.Replace(strings.Replace(fmt.Sprintf("test_%x_%s", src.Int63(), strings.ToLower(t.Name())), "_", "-", -1), "/", "-", -1) + if len(name) >= 63 { + name = name[:63] + } + return name +} diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go index 0b5dffcb03..a2759554a0 100644 --- a/pkg/objstore/objtesting/foreach.go +++ b/pkg/objstore/objtesting/foreach.go @@ -3,9 +3,7 @@ package objtesting import ( "os" "testing" - "time" - "github.com/fortytw2/leaktest" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/azure" "github.com/thanos-io/thanos/pkg/objstore/cos" @@ -21,98 +19,98 @@ import ( // that deletes it after test was run. // Use THANOS_SKIP__TESTS to skip explicitly certain tests. func ForeachStore(t *testing.T, testFn func(t testing.TB, bkt objstore.Bucket)) { + t.Parallel() + // Mandatory Inmem. if ok := t.Run("inmem", func(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() - + t.Parallel() testFn(t, inmem.NewBucket()) - }); !ok { return } // Optional GCS. if _, ok := os.LookupEnv("THANOS_SKIP_GCS_TESTS"); !ok { - bkt, closeFn, err := gcs.NewTestBucket(t, os.Getenv("GCP_PROJECT")) - testutil.Ok(t, err) + t.Run("gcs", func(t *testing.T) { + bkt, closeFn, err := gcs.NewTestBucket(t, os.Getenv("GCP_PROJECT")) + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() - ok := t.Run("gcs", func(t *testing.T) { // TODO(bwplotka): Add leaktest when https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1025 is resolved. testFn(t, bkt) }) - closeFn() - if !ok { - return - } + } else { t.Log("THANOS_SKIP_GCS_TESTS envvar present. Skipping test against GCS.") } // Optional S3. if _, ok := os.LookupEnv("THANOS_SKIP_S3_AWS_TESTS"); !ok { - // TODO(bwplotka): Allow taking location from envvar. - bkt, closeFn, err := s3.NewTestBucket(t, "us-west-2") - testutil.Ok(t, err) + t.Run("aws s3", func(t *testing.T) { + // TODO(bwplotka): Allow taking location from envvar. + bkt, closeFn, err := s3.NewTestBucket(t, "us-west-2") + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() - ok := t.Run("aws s3", func(t *testing.T) { // TODO(bwplotka): Add leaktest when we fix potential leak in minio library. // We cannot use leaktest for detecting our own potential leaks, when leaktest detects leaks in minio itself. // This needs to be investigated more. testFn(t, bkt) }) - closeFn() - if !ok { - return - } + } else { t.Log("THANOS_SKIP_S3_AWS_TESTS envvar present. Skipping test against S3 AWS.") } // Optional Azure. if _, ok := os.LookupEnv("THANOS_SKIP_AZURE_TESTS"); !ok { - bkt, closeFn, err := azure.NewTestBucket(t, "e2e-tests") - testutil.Ok(t, err) + t.Run("azure", func(t *testing.T) { + bkt, closeFn, err := azure.NewTestBucket(t, "e2e-tests") + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() - ok := t.Run("azure", func(t *testing.T) { testFn(t, bkt) }) - closeFn() - if !ok { - return - } + } else { t.Log("THANOS_SKIP_AZURE_TESTS envvar present. Skipping test against Azure.") } // Optional SWIFT. if _, ok := os.LookupEnv("THANOS_SKIP_SWIFT_TESTS"); !ok { - container, closeFn, err := swift.NewTestContainer(t) - testutil.Ok(t, err) + t.Run("swift", func(t *testing.T) { + container, closeFn, err := swift.NewTestContainer(t) + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() - ok := t.Run("swift", func(t *testing.T) { testFn(t, container) }) - closeFn() - if !ok { - return - } + } else { t.Log("THANOS_SKIP_SWIFT_TESTS envvar present. Skipping test against swift.") } // Optional COS. if _, ok := os.LookupEnv("THANOS_SKIP_TENCENT_COS_TESTS"); !ok { - bkt, closeFn, err := cos.NewTestBucket(t) - testutil.Ok(t, err) + t.Run("Tencent cos", func(t *testing.T) { + bkt, closeFn, err := cos.NewTestBucket(t) + testutil.Ok(t, err) + + t.Parallel() + defer closeFn() - ok := t.Run("Tencent cos", func(t *testing.T) { testFn(t, bkt) }) - closeFn() - if !ok { - return - } + } else { t.Log("THANOS_SKIP_TENCENT_COS_TESTS envvar present. Skipping test against Tencent COS.") } diff --git a/pkg/objstore/s3/s3.go b/pkg/objstore/s3/s3.go index bf33400a20..2c062463d5 100644 --- a/pkg/objstore/s3/s3.go +++ b/pkg/objstore/s3/s3.go @@ -6,7 +6,6 @@ import ( "crypto/tls" "fmt" "io" - "math/rand" "net" "net/http" "os" @@ -18,7 +17,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - minio "github.com/minio/minio-go/v6" + "github.com/minio/minio-go/v6" "github.com/minio/minio-go/v6/pkg/credentials" "github.com/minio/minio-go/v6/pkg/encrypt" "github.com/pkg/errors" @@ -26,7 +25,7 @@ import ( "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. @@ -403,13 +402,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke } if c.Bucket == "" { - src := rand.NewSource(time.Now().UnixNano()) - - // Bucket name need to conform: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-s3-bucket-naming-requirements.html. - bktToCreate = strings.Replace(fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), "_", "-", -1) - if len(bktToCreate) >= 63 { - bktToCreate = bktToCreate[:63] - } + bktToCreate = objstore.CreateTemporaryTestBucketName(t) } if err := b.client.MakeBucket(bktToCreate, location); err != nil { diff --git a/pkg/objstore/swift/swift.go b/pkg/objstore/swift/swift.go index 2c06132c3c..ca74ab0f8f 100644 --- a/pkg/objstore/swift/swift.go +++ b/pkg/objstore/swift/swift.go @@ -5,11 +5,9 @@ import ( "context" "fmt" "io" - "math/rand" "os" "strings" "testing" - "time" "github.com/go-kit/kit/log" "github.com/gophercloud/gophercloud" @@ -19,7 +17,7 @@ import ( "github.com/gophercloud/gophercloud/pagination" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/objstore" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. @@ -291,12 +289,7 @@ func NewTestContainer(t testing.TB) (objstore.Bucket, func(), error) { return c, func() {}, nil } - src := rand.NewSource(time.Now().UnixNano()) - - tmpContainerName := fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()) - if len(tmpContainerName) >= 63 { - tmpContainerName = tmpContainerName[:63] - } + tmpContainerName := objstore.CreateTemporaryTestBucketName(t) if err := c.createContainer(tmpContainerName); err != nil { return nil, nil, err diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d5cdf655de..d5ae031316 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -236,7 +236,7 @@ type BucketStore struct { filterConfig *FilterConfig relabelConfig []*relabel.Config - labelSets map[uint64]labels.Labels + advLabelSets []storepb.LabelSet enableCompatibilityLabel bool } @@ -322,14 +322,14 @@ func (s *BucketStore) Close() (err error) { // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { var wg sync.WaitGroup - blockc := make(chan ulid.ULID) + blockc := make(chan *metadata.Meta) for i := 0; i < s.blockSyncConcurrency; i++ { wg.Add(1) go func() { - for id := range blockc { - if err := s.addBlock(ctx, id); err != nil { - level.Warn(s.logger).Log("msg", "loading block failed", "id", id, "err", err) + for meta := range blockc { + if err := s.addBlock(ctx, meta); err != nil { + level.Warn(s.logger).Log("msg", "loading block failed", "id", meta.ULID, "err", err) continue } } @@ -346,14 +346,27 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return nil } - inRange, err := s.isBlockInMinMaxRange(ctx, id) + bdir := path.Join(s.dir, id.String()) + meta, err := loadMeta(ctx, s.logger, s.bucket, bdir, id) + if err != nil { + return errors.Wrap(err, "load meta") + } + + inRange, err := s.isBlockInMinMaxRange(ctx, meta) if err != nil { level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) - return nil + return os.RemoveAll(bdir) } if !inRange { - return nil + return os.RemoveAll(bdir) + } + + // Check for block labels by relabeling. + // If output is empty, the block will be dropped. + if processedLabels := relabel.Process(promlabels.FromMap(meta.Thanos.Labels), s.relabelConfig...); processedLabels == nil { + level.Debug(s.logger).Log("msg", "ignoring block (drop in relabeling)", "block", id) + return os.RemoveAll(bdir) } allIDs[id] = struct{}{} @@ -363,7 +376,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { } select { case <-ctx.Done(): - case blockc <- id: + case blockc <- meta: } return nil }) @@ -387,11 +400,19 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { } // Sync advertise labels. + var storeLabels []storepb.Label s.mtx.Lock() - s.labelSets = make(map[uint64]labels.Labels, len(s.blocks)) - for _, bs := range s.blocks { - s.labelSets[bs.labels.Hash()] = append(labels.Labels(nil), bs.labels...) + s.advLabelSets = s.advLabelSets[:0] + for _, bs := range s.blockSets { + storeLabels := storeLabels[:0] + for _, l := range bs.labels { + storeLabels = append(storeLabels, storepb.Label{Name: l.Name, Value: l.Value}) + } + s.advLabelSets = append(s.advLabelSets, storepb.LabelSet{Labels: storeLabels}) } + sort.Slice(s.advLabelSets, func(i, j int) bool { + return strings.Compare(s.advLabelSets[i].String(), s.advLabelSets[j].String()) < 0 + }) s.mtx.Unlock() return nil @@ -432,14 +453,7 @@ func (s *BucketStore) numBlocks() int { return len(s.blocks) } -func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) { - dir := filepath.Join(s.dir, id.String()) - - err, meta := loadMeta(ctx, s.logger, s.bucket, dir, id) - if err != nil { - return false, err - } - +func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, meta *metadata.Meta) (bool, error) { // We check for blocks in configured minTime, maxTime range. switch { case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): @@ -458,8 +472,8 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { return s.blocks[id] } -func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { - dir := filepath.Join(s.dir, id.String()) +func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) { + dir := filepath.Join(s.dir, meta.ULID.String()) defer func() { if err != nil { @@ -471,11 +485,14 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { }() s.metrics.blockLoads.Inc() + lset := labels.FromMap(meta.Thanos.Labels) + h := lset.Hash() + b, err := newBucketBlock( ctx, - log.With(s.logger, "block", id), + log.With(s.logger, "block", meta.ULID), + meta, s.bucket, - id, dir, s.indexCache, s.chunkPool, @@ -487,17 +504,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { s.mtx.Lock() defer s.mtx.Unlock() - lset := labels.FromMap(b.meta.Thanos.Labels) - h := lset.Hash() - - // Check for block labels by relabeling. - // If output is empty, the block will be dropped. - if processedLabels := relabel.Process(promlabels.FromMap(lset.Map()), s.relabelConfig...); processedLabels == nil { - level.Debug(s.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) - return os.RemoveAll(dir) - } - b.labels = lset - sort.Sort(b.labels) + sort.Sort(lset) set, ok := s.blockSets[h] if !ok { @@ -569,14 +576,8 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info } s.mtx.RLock() - res.LabelSets = make([]storepb.LabelSet, 0, len(s.labelSets)) - for _, ls := range s.labelSets { - lset := make([]storepb.Label, 0, len(ls)) - for _, l := range ls { - lset = append(lset, storepb.Label{Name: l.Name, Value: l.Value}) - } - res.LabelSets = append(res.LabelSets, storepb.LabelSet{Labels: lset}) - } + // Should we clone? + res.LabelSets = s.advLabelSets s.mtx.RUnlock() if s.enableCompatibilityLabel && len(res.LabelSets) > 0 { @@ -1192,21 +1193,18 @@ type bucketBlock struct { lvals map[string][]string postings map[labels.Label]index.Range - id ulid.ULID chunkObjs []string pendingReaders sync.WaitGroup partitioner partitioner - - labels labels.Labels } func newBucketBlock( ctx context.Context, logger log.Logger, + meta *metadata.Meta, bkt objstore.BucketReader, - id ulid.ULID, dir string, indexCache indexCache, chunkPool *pool.BytesPool, @@ -1215,23 +1213,17 @@ func newBucketBlock( b = &bucketBlock{ logger: logger, bucket: bkt, - id: id, indexCache: indexCache, chunkPool: chunkPool, dir: dir, partitioner: p, + meta: meta, } - err, meta := loadMeta(ctx, logger, bkt, dir, id) - if err != nil { - return nil, errors.Wrap(err, "load meta") - } - b.meta = meta - if err = b.loadIndexCacheFile(ctx); err != nil { return nil, errors.Wrap(err, "load index cache") } // Get object handles for all chunk files. - err = bkt.Iter(ctx, path.Join(id.String(), block.ChunksDirname), func(n string) error { + err = bkt.Iter(ctx, path.Join(meta.ULID.String(), block.ChunksDirname), func(n string) error { b.chunkObjs = append(b.chunkObjs, n) return nil }) @@ -1242,33 +1234,36 @@ func newBucketBlock( } func (b *bucketBlock) indexFilename() string { - return path.Join(b.id.String(), block.IndexFilename) + return path.Join(b.meta.ULID.String(), block.IndexFilename) } func (b *bucketBlock) indexCacheFilename() string { - return path.Join(b.id.String(), block.IndexCacheFilename) + return path.Join(b.meta.ULID.String(), block.IndexCacheFilename) } -func loadMeta(ctx context.Context, logger log.Logger, bucket objstore.BucketReader, dir string, id ulid.ULID) (error, *metadata.Meta) { +func loadMeta(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*metadata.Meta, error) { // If we haven't seen the block before or it is missing the meta.json, download it. if _, err := os.Stat(path.Join(dir, block.MetaFilename)); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0777); err != nil { - return errors.Wrap(err, "create dir"), nil + return nil, errors.Wrap(err, "create dir") } src := path.Join(id.String(), block.MetaFilename) - if err := objstore.DownloadFile(ctx, logger, bucket, src, dir); err != nil { - return errors.Wrap(err, "download meta.json"), nil + if err := objstore.DownloadFile(ctx, logger, bkt, src, dir); err != nil { + if bkt.IsObjNotFoundErr(errors.Cause(err)) { + level.Debug(logger).Log("msg", "meta file wasn't found. Block not ready or being deleted.", "block", id.String()) + } + return nil, errors.Wrap(err, "download meta.json") } } else if err != nil { - return err, nil + return nil, err } meta, err := metadata.Read(dir) if err != nil { - return errors.Wrap(err, "read meta.json"), nil + return nil, errors.Wrap(err, "read meta.json") } - return nil, meta + return meta, err } func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 252f3e5b93..43ce3c50b2 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -165,7 +165,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m if err := runutil.Repeat(100*time.Millisecond, ctx.Done(), func() error { return store.SyncBlocks(ctx) - }); err != nil && errors.Cause(err) != context.Canceled { + }); err != nil && ctx.Err() == nil { t.Fatal(err) } }() diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c285df4ed4..69ccb57cc0 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2,11 +2,14 @@ package store import ( "context" + "io" "io/ioutil" "math" + "os" "path" "path/filepath" "sort" + "sync" "testing" "time" @@ -453,7 +456,7 @@ func TestBucketStore_Info(t *testing.T) { testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) testutil.Equals(t, int64(math.MaxInt64), resp.MinTime) testutil.Equals(t, int64(math.MinInt64), resp.MaxTime) - testutil.Equals(t, []storepb.LabelSet{}, resp.LabelSets) + testutil.Equals(t, []storepb.LabelSet(nil), resp.LabelSets) testutil.Equals(t, []storepb.Label(nil), resp.Labels) } @@ -486,14 +489,12 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { extLset, 0) testutil.Ok(t, err) - dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) - meta1, err := metadata.Read(dir1) + meta1, err := metadata.Read(path.Join(dir, id1.String())) testutil.Ok(t, err) - testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir1, meta1)) - - meta2, err := metadata.Read(dir2) + meta2, err := metadata.Read(path.Join(dir, id2.String())) + testutil.Ok(t, err) + meta3, err := metadata.Read(path.Join(dir, id3.String())) testutil.Ok(t, err) - testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2)) // Run actual test. hourBeforeDur := prommodel.Duration(-1 * time.Hour) @@ -507,166 +508,298 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { }, emptyRelabelConfig, true) testutil.Ok(t, err) - inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1) + inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), meta1) testutil.Ok(t, err) testutil.Equals(t, true, inRange) - inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id2) + inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), meta2) testutil.Ok(t, err) testutil.Equals(t, true, inRange) - inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id3) + inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), meta3) testutil.Ok(t, err) testutil.Equals(t, false, inRange) } -func TestBucketStore_selectorBlocks(t *testing.T) { - ctx := context.TODO() +type recorder struct { + mtx sync.Mutex + objstore.Bucket + + touched []string +} + +func (r *recorder) Get(ctx context.Context, name string) (io.ReadCloser, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.touched = append(r.touched, name) + return r.Bucket.Get(ctx, name) +} + +func TestBucketStore_Sharding(t *testing.T) { + ctx := context.Background() logger := log.NewNopLogger() - dir, err := ioutil.TempDir("", "selector-blocks") + + dir, err := ioutil.TempDir("", "test-sharding-prepare") testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + bkt := inmem.NewBucket() series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} - id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) + id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()))) + + id2, err := testutil.CreateBlock(ctx, dir, series, 10, 1000, 2000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.MetaFilename), path.Join(id1.String(), block.MetaFilename))) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id2.String()))) - id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) + id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "b"}, {Name: "region", Value: "r1"}}, 0) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.MetaFilename), path.Join(id2.String(), block.MetaFilename))) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id3.String()))) - id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) + id4, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r2"}}, 0) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.MetaFilename), path.Join(id3.String(), block.MetaFilename))) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id4.String()))) + if ok := t.Run("new_runs", func(t *testing.T) { + testSharding(t, "", bkt, id1, id2, id3, id4) + }); !ok { + return + } + + dir2, err := ioutil.TempDir("", "test-sharding2") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir2)) }() + + if ok := t.Run("reuse_disk", func(t *testing.T) { + testSharding(t, dir2, bkt, id1, id2, id3, id4) + }); !ok { + return + } + +} + +func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ulid.ULID) { + var cached []ulid.ULID + + logger := log.NewLogfmtLogger(os.Stderr) for _, sc := range []struct { - relabelContentYaml string - exceptedLength int - exceptedIds []ulid.ULID + name string + relabel string + expectedIDs []ulid.ULID + expectedAdvLabels []storepb.LabelSet }{ { - relabelContentYaml: ` + name: "no sharding", + expectedIDs: all, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r2"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "b"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, + }, + { + name: "drop cluster=a sources", + relabel: ` - action: drop - regex: "A" + regex: "a" source_labels: - cluster `, - exceptedLength: 1, - exceptedIds: []ulid.ULID{id2}, + expectedIDs: []ulid.ULID{all[2]}, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "b"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, }, { - relabelContentYaml: ` + name: "keep only cluster=a sources", + relabel: ` - action: keep - regex: "A" + regex: "a" source_labels: - cluster `, - exceptedLength: 2, - exceptedIds: []ulid.ULID{id1, id3}, + expectedIDs: []ulid.ULID{all[0], all[1], all[3]}, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r2"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, + }, + { + name: "keep only cluster=a without .*2 region sources", + relabel: ` + - action: keep + regex: "a" + source_labels: + - cluster + - action: drop + regex: ".*2" + source_labels: + - region + `, + expectedIDs: []ulid.ULID{all[0], all[1]}, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, + }, + { + name: "drop all", + relabel: ` + - action: drop + regex: "a" + source_labels: + - cluster + - action: drop + regex: "r1" + source_labels: + - region + `, + expectedIDs: []ulid.ULID{}, + expectedAdvLabels: []storepb.LabelSet(nil), }, } { - var relabelConf []*relabel.Config - err = yaml.Unmarshal([]byte(sc.relabelContentYaml), &relabelConf) - testutil.Ok(t, err) + t.Run(sc.name, func(t *testing.T) { + dir := reuseDisk + + if dir == "" { + var err error + dir, err = ioutil.TempDir("", "test-sharding") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + } - bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 0, 0, 20, false, 20, - filterConf, relabelConf, true) - testutil.Ok(t, err) + var relabelConf []*relabel.Config + testutil.Ok(t, yaml.Unmarshal([]byte(sc.relabel), &relabelConf)) - for _, id := range []ulid.ULID{id1, id2, id3} { - testutil.Ok(t, bucketStore.addBlock(ctx, id)) - } - testutil.Equals(t, sc.exceptedLength, len(bucketStore.blocks)) + rec := &recorder{Bucket: bkt} + bucketStore, err := NewBucketStore(logger, nil, rec, dir, noopCache{}, 0, 0, 99, false, 20, + filterConf, relabelConf, true) + testutil.Ok(t, err) - ids := make([]ulid.ULID, 0, len(bucketStore.blocks)) - for id := range bucketStore.blocks { - ids = append(ids, id) - } - sort.Slice(sc.exceptedIds, func(i, j int) bool { - return sc.exceptedIds[i].Compare(sc.exceptedIds[j]) > 0 - }) - sort.Slice(ids, func(i, j int) bool { - return ids[i].Compare(ids[j]) > 0 - }) - testutil.Equals(t, sc.exceptedIds, ids) - } -} + testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) -func TestBucketStore_InfoWithLabels(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + // Check "stored" blocks. + ids := make([]ulid.ULID, 0, len(bucketStore.blocks)) + for id := range bucketStore.blocks { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { + return ids[i].Compare(ids[j]) < 0 + }) + testutil.Equals(t, sc.expectedIDs, ids) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // Check Info endpoint. + resp, err := bucketStore.Info(context.Background(), &storepb.InfoRequest{}) + testutil.Ok(t, err) - dir, err := ioutil.TempDir("", "bucketstore-test") - testutil.Ok(t, err) + testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) + testutil.Equals(t, []storepb.Label(nil), resp.Labels) + testutil.Equals(t, sc.expectedAdvLabels, resp.LabelSets) - bkt := inmem.NewBucket() - series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} + // Make sure we don't download files we did not expect to. + // Regression test: https://github.com/thanos-io/thanos/issues/1664 - logger := log.NewNopLogger() - id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename))) + // Sort records. We load blocks concurrently so operations might be not ordered. + sort.Strings(rec.touched) - id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename))) + if reuseDisk != "" { + testutil.Equals(t, expectedTouchedBlockOps(all, sc.expectedIDs, cached), rec.touched) + cached = sc.expectedIDs + return + } - id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename))) - - relabelContentYaml := ` - - action: drop - regex: "A" - source_labels: - - cluster - ` - var relabelConfig []*relabel.Config - err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) - testutil.Ok(t, err) - bucketStore, err := NewBucketStore( - nil, - nil, - bkt, - dir, - noopCache{}, - 2e5, - 0, - 0, - false, - 20, - filterConf, - relabelConfig, - true, - ) - testutil.Ok(t, err) + testutil.Equals(t, expectedTouchedBlockOps(all, sc.expectedIDs, nil), rec.touched) + }) + } +} - err = bucketStore.SyncBlocks(ctx) - testutil.Ok(t, err) +func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []ulid.ULID) []string { + var ops []string + for _, id := range all { + blockCached := false + for _, fid := range cached { + if id.Compare(fid) == 0 { + blockCached = true + break + } + } + if blockCached { + continue + } - resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) - testutil.Ok(t, err) + found := false + for _, fid := range expected { + if id.Compare(fid) == 0 { + found = true + break + } + } - testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) - testutil.Equals(t, int64(0), resp.MinTime) - testutil.Equals(t, int64(1000), resp.MaxTime) - testutil.Equals(t, []storepb.Label(nil), resp.Labels) - testutil.Equals(t, []storepb.LabelSet{ - { - Labels: []storepb.Label{ - {Name: "cluster", Value: "B"}, - }, - }, - { - Labels: []storepb.Label{ - {Name: CompatibilityTypeLabelName, Value: "store"}, - }, - }, - }, resp.LabelSets) + ops = append(ops, path.Join(id.String(), block.MetaFilename)) + if found { + ops = append(ops, + path.Join(id.String(), block.IndexCacheFilename), + path.Join(id.String(), block.IndexFilename), + ) + } + } + sort.Strings(ops) + return ops }