diff --git a/CHANGELOG.md b/CHANGELOG.md index e2a7706202..642fe2707d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan We use *breaking* word for marking changes that are not backward compatible (relates only to v0.y.z releases.) ## Unreleased + ### Fixed - [#1856](https://github.com/thanos-io/thanos/pull/1856) Receive: close DBReadOnly after flushing to fix a memory leak. diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 51f07d13fe..18f71c982e 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -20,8 +20,6 @@ import ( "github.com/oklog/ulid" ) -// NOTE(bplotka): For block packages we cannot use testutil, because they import block package. Consider moving simple -// testutil methods to separate package. func TestIsBlockDir(t *testing.T) { for _, tc := range []struct { input string @@ -58,10 +56,7 @@ func TestIsBlockDir(t *testing.T) { } { t.Run(tc.input, func(t *testing.T) { id, ok := IsBlockDir(tc.input) - if ok != tc.bdir { - t.Errorf("expected block dir != %v", tc.bdir) - t.FailNow() - } + testutil.Equals(t, tc.bdir, ok) if id.Compare(tc.id) != 0 { t.Errorf("expected %s got %s", tc.id, id) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go new file mode 100644 index 0000000000..7e2d5a5423 --- /dev/null +++ b/pkg/block/fetcher.go @@ -0,0 +1,410 @@ +package block + +import ( + "context" + "encoding/json" + stderrors "errors" + "io/ioutil" + "os" + "path" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" + tsdberrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +type syncMetrics struct { + syncs prometheus.Counter + syncFailures prometheus.Counter + syncDuration prometheus.Histogram + + synced *extprom.TxGaugeVec +} + +const ( + syncMetricSubSys = "blocks_meta" + + corruptedMeta = "corrupted-meta-json" + noMeta = "no-meta-json" + loadedMeta = "loaded" + failedMeta = "failed" + + // Filter's label values. + labelExcludedMeta = "label-excluded" + timeExcludedMeta = "time-excluded" + TooFreshMeta = "too-fresh" +) + +func newSyncMetrics(r prometheus.Registerer) *syncMetrics { + var m syncMetrics + + m.syncs = prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: syncMetricSubSys, + Name: "syncs_total", + Help: "Total blocks metadata synchronization attempts", + }) + m.syncFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Subsystem: syncMetricSubSys, + Name: "sync_failures_total", + Help: "Total blocks metadata synchronization failures", + }) + m.syncDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Subsystem: syncMetricSubSys, + Name: "sync_duration_seconds", + Help: "Duration of the blocks metadata synchronization in seconds", + Buckets: []float64{0.01, 1, 10, 100, 1000}, + }) + m.synced = extprom.NewTxGaugeVec(prometheus.GaugeOpts{ + Subsystem: syncMetricSubSys, + Name: "synced", + Help: "Number of block metadata synced", + }, + []string{"state"}, + []string{corruptedMeta}, + []string{noMeta}, + []string{loadedMeta}, + []string{TooFreshMeta}, + []string{failedMeta}, + []string{labelExcludedMeta}, + []string{timeExcludedMeta}, + ) + if r != nil { + r.MustRegister( + m.syncs, + m.syncFailures, + m.syncDuration, + m.synced, + ) + } + return &m +} + +type MetadataFetcher interface { + Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) +} + +type GaugeLabeled interface { + WithLabelValues(lvs ...string) prometheus.Gauge +} + +type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) + +// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. +type MetaFetcher struct { + logger log.Logger + concurrency int + bkt objstore.BucketReader + + // Optional local directory to cache meta.json files. + cacheDir string + metrics *syncMetrics + + filters []MetaFetcherFilter + + cached map[ulid.ULID]*metadata.Meta +} + +// NewMetaFetcher constructs MetaFetcher. +func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters ...MetaFetcherFilter) (*MetaFetcher, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + cacheDir := "" + if dir != "" { + cacheDir = filepath.Join(dir, "meta-syncer") + if err := os.MkdirAll(cacheDir, os.ModePerm); err != nil { + return nil, err + } + } + + return &MetaFetcher{ + logger: log.With(logger, "component", "block.MetaFetcher"), + concurrency: concurrency, + bkt: bkt, + cacheDir: cacheDir, + metrics: newSyncMetrics(r), + filters: filters, + cached: map[ulid.ULID]*metadata.Meta{}, + }, nil +} + +var ( + ErrorSyncMetaNotFound = errors.New("meta.json not found") + ErrorSyncMetaCorrupted = errors.New("meta.json corrupted") +) + +// loadMeta returns metadata from object storage or error. +// It returns `ErrorSyncMetaNotFound` and `ErrorSyncMetaCorrupted` sentinel errors in those cases. +func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { + var ( + metaFile = path.Join(id.String(), MetaFilename) + cachedBlockDir = filepath.Join(s.cacheDir, id.String()) + ) + + // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. + // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. + // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). + ok, err := s.bkt.Exists(ctx, metaFile) + if err != nil { + return nil, errors.Wrapf(err, "meta.json file exists: %v", metaFile) + } + if !ok { + return nil, ErrorSyncMetaNotFound + } + + if m, seen := s.cached[id]; seen { + return m, nil + } + + // Best effort load from local dir. + if s.cacheDir != "" { + m, err := metadata.Read(cachedBlockDir) + if err == nil { + return m, nil + } + + if !stderrors.Is(err, os.ErrNotExist) { + level.Warn(s.logger).Log("msg", "best effort read of the local meta.json failed; removing cached block dir", "dir", cachedBlockDir, "err", err) + if err := os.RemoveAll(cachedBlockDir); err != nil { + level.Warn(s.logger).Log("msg", "best effort remove of cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + } + + level.Debug(s.logger).Log("msg", "download meta", "name", metaFile) + + r, err := s.bkt.Get(ctx, metaFile) + if s.bkt.IsObjNotFoundErr(err) { + // Meta.json was deleted between bkt.Exists and here. + return nil, errors.Wrapf(ErrorSyncMetaNotFound, "%v", err) + } + if err != nil { + return nil, errors.Wrapf(err, "get meta file: %v", metaFile) + } + + defer runutil.CloseWithLogOnErr(s.logger, r, "close bkt meta get") + + metaContent, err := ioutil.ReadAll(r) + if err != nil { + return nil, errors.Wrapf(err, "read meta file: %v", metaFile) + } + + m := &metadata.Meta{} + if err := json.Unmarshal(metaContent, m); err != nil { + return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v unmarshal: %v", metaFile, err) + } + + if m.Version != metadata.MetaVersion1 { + return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version) + } + + // Best effort cache in local dir. + if s.cacheDir != "" { + if err := os.MkdirAll(cachedBlockDir, os.ModePerm); err != nil { + level.Warn(s.logger).Log("msg", "best effort mkdir of the meta.json block dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + + if err := metadata.Write(s.logger, cachedBlockDir, m); err != nil { + level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + + return m, nil +} + +// Fetch returns all block metas as well as partial blocks (blocks without or with corrupted meta file) from the bucket. +// It's caller responsibility to not change the returned metadata files. Maps can be modified. +// +// Returned error indicates a failure in fetching metadata. Returned meta can be assumed as correct, with some blocks missing. +func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) { + start := time.Now() + defer func() { + s.metrics.syncDuration.Observe(time.Since(start).Seconds()) + if err != nil { + s.metrics.syncFailures.Inc() + } + }() + s.metrics.syncs.Inc() + + metas = make(map[ulid.ULID]*metadata.Meta) + partial = make(map[ulid.ULID]error) + + var ( + wg sync.WaitGroup + ch = make(chan ulid.ULID, s.concurrency) + mtx sync.Mutex + + metaErrs tsdberrors.MultiError + ) + + s.metrics.synced.ResetTx() + + for i := 0; i < s.concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for id := range ch { + meta, err := s.loadMeta(ctx, id) + if err == nil { + mtx.Lock() + metas[id] = meta + mtx.Unlock() + continue + } + + switch errors.Cause(err) { + default: + s.metrics.synced.WithLabelValues(failedMeta).Inc() + mtx.Lock() + metaErrs.Add(err) + mtx.Unlock() + continue + case ErrorSyncMetaNotFound: + s.metrics.synced.WithLabelValues(noMeta).Inc() + case ErrorSyncMetaCorrupted: + s.metrics.synced.WithLabelValues(corruptedMeta).Inc() + } + + mtx.Lock() + partial[id] = err + mtx.Unlock() + } + }() + } + + // Workers scheduled, distribute blocks. + err = s.bkt.Iter(ctx, "", func(name string) error { + id, ok := IsBlockDir(name) + if !ok { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- id: + } + + return nil + }) + close(ch) + + wg.Wait() + if err != nil { + return nil, nil, errors.Wrap(err, "MetaFetcher: iter bucket") + } + + incompleteView := len(metaErrs) > 0 + + // Only for complete view of blocks update the cache. + if !incompleteView { + cached := make(map[ulid.ULID]*metadata.Meta, len(metas)) + for id, m := range metas { + cached[id] = m + } + s.cached = cached + + // Best effort cleanup of disk-cached metas. + if s.cacheDir != "" { + names, err := fileutil.ReadDir(s.cacheDir) + if err != nil { + level.Warn(s.logger).Log("msg", "best effort remove of not needed cached dirs failed; ignoring", "err", err) + } else { + for _, n := range names { + id, ok := IsBlockDir(n) + if !ok { + continue + } + + if _, ok := metas[id]; ok { + continue + } + + cachedBlockDir := filepath.Join(s.cacheDir, id.String()) + + // No such block loaded, remove the local dir. + if err := os.RemoveAll(cachedBlockDir); err != nil { + level.Warn(s.logger).Log("msg", "best effort remove of not needed cached dir failed; ignoring", "dir", cachedBlockDir, "err", err) + } + } + } + } + } + + for _, f := range s.filters { + // NOTE: filter can update synced metric accordingly to the reason of the exclude. + f(metas, s.metrics.synced, incompleteView) + } + + s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) + s.metrics.synced.Submit() + + if incompleteView { + return metas, partial, errors.Wrap(metaErrs, "incomplete view") + } + + level.Info(s.logger).Log("msg", "successfully fetched block metadata", "duration", time.Since(start).String(), "cached", len(s.cached), "returned", len(metas), "partial", len(partial)) + return metas, partial, nil +} + +var _ MetaFetcherFilter = (&TimePartitionMetaFilter{}).Filter + +// TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range. +type TimePartitionMetaFilter struct { + minTime, maxTime model.TimeOrDurationValue +} + +// NewTimePartitionMetaFilter creates TimePartitionMetaFilter. +func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *TimePartitionMetaFilter { + return &TimePartitionMetaFilter{minTime: MinTime, maxTime: MaxTime} +} + +// Filter filters out blocks that are outside of specified time range. +func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { + for id, m := range metas { + if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() { + continue + } + synced.WithLabelValues(timeExcludedMeta).Inc() + delete(metas, id) + } +} + +var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter + +// LabelShardedMetaFilter is a MetaFetcher filter that filters out blocks that have no labels after relabelling. +type LabelShardedMetaFilter struct { + relabelConfig []*relabel.Config +} + +// NewLabelShardedMetaFilter creates LabelShardedMetaFilter. +func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMetaFilter { + return &LabelShardedMetaFilter{relabelConfig: relabelConfig} +} + +// Filter filters out blocks that filters blocks that have no labels after relabelling. +func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { + for id, m := range metas { + if processedLabels := relabel.Process(labels.FromMap(m.Thanos.Labels), f.relabelConfig...); processedLabels != nil { + continue + } + synced.WithLabelValues(labelExcludedMeta).Inc() + delete(metas, id) + } +} diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go new file mode 100644 index 0000000000..8be50a928f --- /dev/null +++ b/pkg/block/fetcher_test.go @@ -0,0 +1,392 @@ +package block + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/pkg/relabel" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/objtesting" + "github.com/thanos-io/thanos/pkg/testutil" + "gopkg.in/yaml.v2" +) + +func ULID(i int) ulid.ULID { return ulid.MustNew(uint64(i), nil) } + +func ULIDs(is ...int) []ulid.ULID { + ret := []ulid.ULID{} + for _, i := range is { + ret = append(ret, ulid.MustNew(uint64(i), nil)) + } + + return ret +} + +func TestMetaFetcher_Fetch(t *testing.T) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + dir, err := ioutil.TempDir("", "test-meta-fetcher") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + var ulidToDelete ulid.ULID + r := prometheus.NewRegistry() + f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) { + if _, ok := metas[ulidToDelete]; ok { + synced.WithLabelValues("filtered").Inc() + delete(metas, ulidToDelete) + } + }) + testutil.Ok(t, err) + + for i, tcase := range []struct { + name string + do func() + filterULID ulid.ULID + expectedMetas []ulid.ULID + expectedCorruptedMeta []ulid.ULID + expectedNoMeta []ulid.ULID + expectedFiltered int + expectedMetaErr error + }{ + { + name: "empty bucket", + do: func() {}, + + expectedMetas: ULIDs(), + expectedCorruptedMeta: ULIDs(), + expectedNoMeta: ULIDs(), + }, + { + name: "3 metas in bucket", + do: func() { + var meta metadata.Meta + meta.Version = 1 + meta.ULID = ULID(1) + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + + meta.ULID = ULID(2) + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + + meta.ULID = ULID(3) + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + }, + + expectedMetas: ULIDs(1, 2, 3), + expectedCorruptedMeta: ULIDs(), + expectedNoMeta: ULIDs(), + }, + { + name: "nothing changed", + do: func() {}, + + expectedMetas: ULIDs(1, 2, 3), + expectedCorruptedMeta: ULIDs(), + expectedNoMeta: ULIDs(), + }, + { + name: "fresh cache", + do: func() { + f.cached = map[ulid.ULID]*metadata.Meta{} + }, + + expectedMetas: ULIDs(1, 2, 3), + expectedCorruptedMeta: ULIDs(), + expectedNoMeta: ULIDs(), + }, + { + name: "fresh cache: meta 2 and 3 have corrupted data on disk ", + do: func() { + f.cached = map[ulid.ULID]*metadata.Meta{} + + testutil.Ok(t, os.Remove(filepath.Join(dir, "meta-syncer", ULID(2).String(), MetaFilename))) + + f, err := os.OpenFile(filepath.Join(dir, "meta-syncer", ULID(3).String(), MetaFilename), os.O_WRONLY, os.ModePerm) + testutil.Ok(t, err) + + _, err = f.WriteString("{ almost") + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + }, + + expectedMetas: ULIDs(1, 2, 3), + expectedCorruptedMeta: ULIDs(), + expectedNoMeta: ULIDs(), + }, + { + name: "block without meta", + do: func() { + testutil.Ok(t, bkt.Upload(ctx, path.Join(ULID(4).String(), "some-file"), bytes.NewBuffer([]byte("something")))) + }, + + expectedMetas: ULIDs(1, 2, 3), + expectedCorruptedMeta: ULIDs(), + expectedNoMeta: ULIDs(4), + }, + { + name: "corrupted meta.json", + do: func() { + testutil.Ok(t, bkt.Upload(ctx, path.Join(ULID(5).String(), MetaFilename), bytes.NewBuffer([]byte("{ not a json")))) + }, + + expectedMetas: ULIDs(1, 2, 3), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + }, + { + name: "some added some deleted", + do: func() { + testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, ULID(2))) + + var meta metadata.Meta + meta.Version = 1 + meta.ULID = ULID(6) + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + }, + + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + }, + { + name: "filter not existing ulid", + do: func() {}, + filterULID: ULID(10), + + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + }, + { + name: "filter ulid 1", + do: func() {}, + filterULID: ULID(1), + + expectedMetas: ULIDs(3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + expectedFiltered: 1, + }, + { + name: "error: not supported meta version", + do: func() { + var meta metadata.Meta + meta.Version = 20 + meta.ULID = ULID(7) + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf)) + }, + + expectedMetas: ULIDs(1, 3, 6), + expectedCorruptedMeta: ULIDs(5), + expectedNoMeta: ULIDs(4), + expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"), + }, + } { + if ok := t.Run(tcase.name, func(t *testing.T) { + tcase.do() + + ulidToDelete = tcase.filterULID + metas, partial, err := f.Fetch(ctx) + if tcase.expectedMetaErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedMetaErr.Error(), err.Error()) + } else { + testutil.Ok(t, err) + } + + { + metasSlice := make([]ulid.ULID, 0, len(metas)) + for id, m := range metas { + testutil.Assert(t, m != nil, "meta is nil") + metasSlice = append(metasSlice, id) + } + sort.Slice(metasSlice, func(i, j int) bool { + return metasSlice[i].Compare(metasSlice[j]) < 0 + }) + testutil.Equals(t, tcase.expectedMetas, metasSlice) + } + + { + partialSlice := make([]ulid.ULID, 0, len(partial)) + for id := range partial { + + partialSlice = append(partialSlice, id) + } + sort.Slice(partialSlice, func(i, j int) bool { + return partialSlice[i].Compare(partialSlice[j]) >= 0 + }) + expected := append([]ulid.ULID{}, tcase.expectedCorruptedMeta...) + expected = append(expected, tcase.expectedNoMeta...) + sort.Slice(expected, func(i, j int) bool { + return expected[i].Compare(expected[j]) >= 0 + }) + testutil.Equals(t, expected, partialSlice) + } + + expectedFailures := 0 + if tcase.expectedMetaErr != nil { + expectedFailures = 1 + } + testutil.Equals(t, float64(i+1), promtest.ToFloat64(f.metrics.syncs)) + testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(f.metrics.synced.WithLabelValues(loadedMeta))) + testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(f.metrics.synced.WithLabelValues(noMeta))) + testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(f.metrics.synced.WithLabelValues("filtered"))) + testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(f.metrics.synced.WithLabelValues(failedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(f.metrics.synced.WithLabelValues(TooFreshMeta))) + }); !ok { + return + } + } + }) +} + +func TestLabelShardedMetaFilter_Filter(t *testing.T) { + relabelContentYaml := ` + - action: drop + regex: "A" + source_labels: + - cluster + - action: keep + regex: "keepme" + source_labels: + - message + ` + var relabelConfig []*relabel.Config + testutil.Ok(t, yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig)) + + f := NewLabelShardedMetaFilter(relabelConfig) + + input := map[ulid.ULID]*metadata.Meta{ + ULID(1): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "B", "message": "keepme"}, + }, + }, + ULID(2): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"something": "A", "message": "keepme"}, + }, + }, + ULID(3): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "A", "message": "keepme"}, + }, + }, + ULID(4): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "A", "something": "B", "message": "keepme"}, + }, + }, + ULID(5): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "B"}, + }, + }, + ULID(6): { + Thanos: metadata.Thanos{ + Labels: map[string]string{"cluster": "B", "message": "keepme"}, + }, + }, + } + expected := map[ulid.ULID]*metadata.Meta{ + ULID(1): input[ULID(1)], + ULID(2): input[ULID(2)], + ULID(6): input[ULID(6)], + } + + synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + f.Filter(input, synced, false) + + testutil.Equals(t, 3.0, promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, expected, input) + +} + +func TestTimePartitionMetaFilter_Filter(t *testing.T) { + mint := time.Unix(0, 1*time.Millisecond.Nanoseconds()) + maxt := time.Unix(0, 10*time.Millisecond.Nanoseconds()) + f := NewTimePartitionMetaFilter(model.TimeOrDurationValue{Time: &mint}, model.TimeOrDurationValue{Time: &maxt}) + + input := map[ulid.ULID]*metadata.Meta{ + ULID(1): { + BlockMeta: tsdb.BlockMeta{ + MinTime: 0, + MaxTime: 1, + }, + }, + ULID(2): { + BlockMeta: tsdb.BlockMeta{ + MinTime: 1, + MaxTime: 10, + }, + }, + ULID(3): { + BlockMeta: tsdb.BlockMeta{ + MinTime: 2, + MaxTime: 30, + }, + }, + ULID(4): { + BlockMeta: tsdb.BlockMeta{ + MinTime: 0, + MaxTime: 30, + }, + }, + ULID(5): { + BlockMeta: tsdb.BlockMeta{ + MinTime: -1, + MaxTime: 0, + }, + }, + ULID(6): { + BlockMeta: tsdb.BlockMeta{ + MinTime: 20, + MaxTime: 30, + }, + }, + } + expected := map[ulid.ULID]*metadata.Meta{ + ULID(1): input[ULID(1)], + ULID(2): input[ULID(2)], + ULID(3): input[ULID(3)], + ULID(4): input[ULID(4)], + } + + synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"}) + f.Filter(input, synced, false) + + testutil.Equals(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, expected, input) + +} diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 0484aad4fd..9f35f24bff 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -32,7 +32,7 @@ import ( ) func TestSyncer_SyncMetas_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -80,7 +80,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) { } func TestSyncer_GarbageCollect_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -196,7 +196,7 @@ func MetricCount(c prometheus.Collector) int { } func TestGroup_Compact_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -511,7 +511,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) { extLsets := []labels.Labels{{{Name: "cluster", Value: "A"}}, {{Name: "cluster", Value: "B"}}} - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() diff --git a/pkg/extprom/tx_gauge.go b/pkg/extprom/tx_gauge.go new file mode 100644 index 0000000000..d0ee7342e6 --- /dev/null +++ b/pkg/extprom/tx_gauge.go @@ -0,0 +1,89 @@ +package extprom + +import ( + "sync" + + "github.com/prometheus/client_golang/prometheus" +) + +type TxGaugeVec struct { + current *prometheus.GaugeVec + mtx sync.Mutex + newMetricVal func() *prometheus.GaugeVec + + tx *prometheus.GaugeVec +} + +// NewTxGaugeVec is a prometheus.GaugeVec that allows to start atomic metric value transaction. +// It might be useful if long process that wants to update a GaugeVec but wants to build/accumulate those metrics +// in a concurrent way without exposing partial state to Prometheus. +// Caller can also use this as normal GaugeVec. +// +// Additionally it allows to init LabelValues on each transaction. +// NOTE: This is quite naive implementation creating new prometheus.GaugeVec on each `ResetTx`, use wisely. +func NewTxGaugeVec(opts prometheus.GaugeOpts, labelNames []string, initLabelValues ...[]string) *TxGaugeVec { + f := func() *prometheus.GaugeVec { + g := prometheus.NewGaugeVec(opts, labelNames) + for _, vals := range initLabelValues { + g.WithLabelValues(vals...) + } + return g + } + return &TxGaugeVec{ + current: f(), + newMetricVal: f, + } +} + +// ResetTx starts new transaction. Not goroutine-safe. +func (tx *TxGaugeVec) ResetTx() { + tx.tx = tx.newMetricVal() +} + +// Submit atomically and fully applies new values from existing transaction GaugeVec. Not goroutine-safe. +func (tx *TxGaugeVec) Submit() { + if tx.tx == nil { + return + } + + tx.mtx.Lock() + tx.current = tx.tx + tx.mtx.Unlock() +} + +// Describe is used in Register. +func (tx *TxGaugeVec) Describe(ch chan<- *prometheus.Desc) { + tx.mtx.Lock() + defer tx.mtx.Unlock() + + tx.current.Describe(ch) +} + +// Collect is used by Registered. +func (tx *TxGaugeVec) Collect(ch chan<- prometheus.Metric) { + tx.mtx.Lock() + defer tx.mtx.Unlock() + + tx.current.Collect(ch) +} + +// With works as GetMetricWith, but panics where GetMetricWithLabels would have +// returned an error. Not returning an error allows shortcuts like +// myVec.With(prometheus.Labels{"code": "404", "method": "GET"}).Add(42) +func (tx *TxGaugeVec) With(labels prometheus.Labels) prometheus.Gauge { + if tx.tx == nil { + tx.ResetTx() + } + return tx.tx.With(labels) +} + +// WithLabelValues works as GetMetricWithLabelValues, but panics where +// GetMetricWithLabelValues would have returned an error. Not returning an +// error allows shortcuts like +// myVec.WithLabelValues("404", "GET").Add(42) +func (tx *TxGaugeVec) WithLabelValues(lvs ...string) prometheus.Gauge { + if tx.tx == nil { + tx.ResetTx() + } + return tx.tx.WithLabelValues(lvs...) +} diff --git a/pkg/extprom/tx_gauge_test.go b/pkg/extprom/tx_gauge_test.go new file mode 100644 index 0000000000..a2c3a5988c --- /dev/null +++ b/pkg/extprom/tx_gauge_test.go @@ -0,0 +1,179 @@ +package extprom + +import ( + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestTxGaugeVec(t *testing.T) { + g := NewTxGaugeVec(prometheus.GaugeOpts{ + Name: "metric", + }, []string{"a", "b"}, []string{"a1", "b1"}, []string{"a2", "b2"}) + + for _, tcase := range []struct { + name string + txUse func() + exp map[string]float64 + }{ + { + name: "nothing", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=a1,b=b1", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a1", "b1").Add(0.3) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1.3, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=a1,b=b1 again, should return same result", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a1", "b1").Add(-10) + g.WithLabelValues("a1", "b1").Add(10.3) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1.3000000000000007, // Say hi to float comparisons. + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=a1,b=b1 again, should return same result", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a1", "b1").Add(-10) + g.WithLabelValues("a1", "b1").Set(1.3) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1.3, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "nothing again", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change a=aX,b=b1", + txUse: func() { + g.WithLabelValues("aX", "b1").Set(500.2) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + "name:\"a\" value:\"aX\" ,name:\"b\" value:\"b1\" ": 500.2, + }, + }, + { + name: "change a=aX,b=b1", + txUse: func() { + g.WithLabelValues("aX", "b1").Set(500.2) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + "name:\"a\" value:\"aX\" ,name:\"b\" value:\"b1\" ": 500.2, + }, + }, + { + name: "nothing again", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + { + name: "change 3 metrics", + txUse: func() { + g.WithLabelValues("a1", "b1").Inc() + g.WithLabelValues("a2", "b2").Add(-2) + g.WithLabelValues("a3", "b3").Set(1.1) + }, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 1, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": -2, + "name:\"a\" value:\"a3\" ,name:\"b\" value:\"b3\" ": 1.1, + }, + }, + { + name: "nothing again", + txUse: func() {}, + exp: map[string]float64{ + "name:\"a\" value:\"a1\" ,name:\"b\" value:\"b1\" ": 0, + "name:\"a\" value:\"a2\" ,name:\"b\" value:\"b2\" ": 0, + }, + }, + } { + if ok := t.Run(tcase.name, func(t *testing.T) { + g.ResetTx() + + tcase.txUse() + g.Submit() + + testutil.Equals(t, tcase.exp, toFloat64(t, g)) + + }); !ok { + return + } + } +} + +// toFloat64 is prometheus/client_golang/prometheus/testutil.ToFloat64 version that works with multiple labelnames. +// NOTE: Be careful on float comparison. +func toFloat64(t *testing.T, c prometheus.Collector) map[string]float64 { + var ( + mChan = make(chan prometheus.Metric) + exp = map[string]float64{} + ) + + go func() { + c.Collect(mChan) + close(mChan) + }() + + for m := range mChan { + pb := &dto.Metric{} + testutil.Ok(t, m.Write(pb)) + if pb.Gauge != nil { + exp[lbToString(pb.GetLabel())] = pb.Gauge.GetValue() + continue + } + if pb.Counter != nil { + exp[lbToString(pb.GetLabel())] = pb.Counter.GetValue() + continue + } + if pb.Untyped != nil { + exp[lbToString(pb.GetLabel())] = pb.Untyped.GetValue() + } + panic(fmt.Errorf("collected a non-gauge/counter/untyped metric: %s", pb)) + } + + return exp +} + +func lbToString(pairs []*dto.LabelPair) string { + var ret []string + for _, r := range pairs { + ret = append(ret, r.String()) + } + return strings.Join(ret, ",") +} diff --git a/pkg/objstore/objtesting/acceptance_e2e_test.go b/pkg/objstore/objtesting/acceptance_e2e_test.go index ce918289f7..1658aca397 100644 --- a/pkg/objstore/objtesting/acceptance_e2e_test.go +++ b/pkg/objstore/objtesting/acceptance_e2e_test.go @@ -16,7 +16,7 @@ import ( // NOTE: This test assumes strong consistency, but in the same way it does not guarantee that if it passes, the // used object store is strongly consistent. func TestObjStore_AcceptanceTest_e2e(t *testing.T) { - ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx := context.Background() _, err := bkt.Get(ctx, "") diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go index e96ac0ef5b..07d4b352d2 100644 --- a/pkg/objstore/objtesting/foreach.go +++ b/pkg/objstore/objtesting/foreach.go @@ -40,7 +40,7 @@ func IsObjStoreSkipped(t *testing.T, provider client.ObjProvider) bool { // For each it creates a new bucket with a random name and a cleanup function // that deletes it after test was run. // Use THANOS_TEST_OBJSTORE_SKIP to skip explicitly certain object storages. -func ForeachStore(t *testing.T, testFn func(t testing.TB, bkt objstore.Bucket)) { +func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) { t.Parallel() // Mandatory Inmem. Not parallel, to detect problem early. diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 362f27f583..d84e1b6d16 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -27,7 +27,7 @@ import ( ) func TestShipper_SyncBlocks_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { dir, err := ioutil.TempDir("", "shipper-e2e-test") testutil.Ok(t, err) defer func() { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index d7623e0f84..0ea774e87f 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -391,7 +391,7 @@ func testBucketStore_e2e(t testing.TB, ctx context.Context, s *storeSuite) { } func TestBucketStore_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -439,7 +439,7 @@ func (g naivePartitioner) Partition(length int, rng func(int) (uint64, uint64)) // This tests if our, sometimes concurrent, fetches for different parts works. // Regression test against: https://github.com/thanos-io/thanos/issues/829. func TestBucketStore_ManyParts_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()