From ce72bfe15f6504d0daa60be29612affe13f87bf2 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 9 Nov 2020 20:08:25 +0100 Subject: [PATCH] compact: Added index size limiting planner detecting output index size over 64GB. (#3410) * compact: Added index size limiting planner detecting output index size over 64GB. Fixes: https://github.com/thanos-io/thanos/issues/1424 Signed-off-by: Bartlomiej Plotka * Addressed comments; added changelog. Signed-off-by: Bartlomiej Plotka * Skipped flaky test. Signed-off-by: Bartlomiej Plotka --- CHANGELOG.md | 5 + cmd/thanos/compact.go | 50 ++++++-- pkg/block/block.go | 32 +++++ pkg/block/block_test.go | 56 +++++++++ pkg/block/metadata/markers.go | 2 + pkg/compact/planner.go | 91 +++++++++++++- pkg/compact/planner_test.go | 224 +++++++++++++++++++++++++++++++++- test/e2e/compact_test.go | 74 +++++++++-- test/e2e/query_test.go | 16 +-- test/e2e/rule_test.go | 10 +- test/e2e/rules_api_test.go | 2 +- 11 files changed, 522 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 241ed42355..773be2925f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,8 +26,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time - [#3398](https://github.com/thanos-io/thanos/pull/3398) Query Frontend: Add default config for query frontend memcached config. - [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data. +- [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction. ### Fixed + - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. - [#3331](https://github.com/thanos-io/thanos/pull/3331) Disable Azure blob exception logging - [#3341](https://github.com/thanos-io/thanos/pull/3341) Disable Azure blob syslog exception logging @@ -35,6 +37,9 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed +- [#3410](https://github.com/thanos-io/thanos/pull/3410) Compactor: Changed metric `thanos_compactor_blocks_marked_for_deletion_total` to `thanos_compactor_blocks_marked_total` with `marker` label. + Compactor will now automatically disable compaction for blocks with large index that would output blocks after compaction larger than specified value (by default: 64GB). This automatically + handles the Promethus [format limit](https://github.com/thanos-io/thanos/issues/1424). - [#2906](https://github.com/thanos-io/thanos/pull/2906) Tools: Refactor Bucket replicate execution. Removed all `thanos_replicate_origin_.*` metrics. - `thanos_replicate_origin_meta_loads_total` can be replaced by `blocks_meta_synced{state="loaded"}`. - `thanos_replicate_origin_partial_meta_reads_total` can be replaced by `blocks_meta_synced{state="failed"}`. diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 6be126c1b9..5d0ab85794 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/alecthomas/units" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" @@ -124,10 +125,13 @@ func runCompact( Name: "thanos_compactor_block_cleanup_failures_total", Help: "Failures encountered while deleting blocks in compactor.", }) - blocksMarkedForDeletion := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_compactor_blocks_marked_for_deletion_total", - Help: "Total number of blocks marked for deletion in compactor.", - }) + blocksMarked := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compactor_blocks_marked_total", + Help: "Total number of blocks marked in compactor.", + }, []string{"marker"}) + blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename) + blocksMarked.WithLabelValues(metadata.DeletionMarkFilename) + garbageCollectedBlocks := promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", @@ -244,7 +248,7 @@ func runCompact( cf, duplicateBlocksFilter, ignoreDeletionMarkFilter, - blocksMarkedForDeletion, + blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), garbageCollectedBlocks, conf.blockSyncConcurrency) if err != nil { @@ -280,9 +284,31 @@ func runCompact( return errors.Wrap(err, "clean working downsample directory") } - grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks) + grouper := compact.NewDefaultGrouper( + logger, + bkt, + conf.acceptMalformedIndex, + enableVerticalCompaction, + reg, + blocksMarked.WithLabelValues(metadata.DeletionMarkFilename), + garbageCollectedBlocks, + ) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures) - compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency) + compactor, err := compact.NewBucketCompactor( + logger, + sy, + grouper, + compact.WithLargeTotalIndexSizeFilter( + compact.NewPlanner(logger, levels, noCompactMarkerFilter), + bkt, + int64(conf.maxBlockIndexSize), + blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename), + ), + comp, + compactDir, + bkt, + conf.compactionConcurrency, + ) if err != nil { cancel() return errors.Wrap(err, "create bucket compactor") @@ -373,7 +399,7 @@ func runCompact( return errors.Wrap(err, "sync before first pass of downsampling") } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil { + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)); err != nil { return errors.Wrap(err, "retention failed") } @@ -512,6 +538,7 @@ type compactConfig struct { selectorRelabelConf extflag.PathOrContent webConf webConfig label string + maxBlockIndexSize units.Base2Bytes } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -574,6 +601,13 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication."). Hidden().StringsVar(&cc.dedupReplicaLabels) + // TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. + cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+ + "total size is approximated in worst case. If the block that would be resulted from compaction is estimated to exceed this number, biggest source"+ + "block is marked for no compaction (no-compact-mark.json is uploaded) which causes this block to be excluded from any compaction. "+ + "Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size."). + Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize) + cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd) cc.webConf.registerFlag(cmd) diff --git a/pkg/block/block.go b/pkg/block/block.go index d02ba26a7a..164f8f4ba7 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -284,3 +284,35 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) { // TODO(bwplotka): Add optional files like tombstones? return res, err } + +// MarkForNoCompact creates a file which marks block to be not compacted. +func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error { + m := path.Join(id.String(), metadata.NoCompactMarkFilename) + noCompactMarkExists, err := bkt.Exists(ctx, m) + if err != nil { + return errors.Wrapf(err, "check exists %s in bucket", m) + } + if noCompactMarkExists { + level.Warn(logger).Log("msg", "requested to mark for no compaction, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m)) + return nil + } + + noCompactMark, err := json.Marshal(metadata.NoCompactMark{ + ID: id, + Version: metadata.NoCompactMarkVersion1, + + Time: time.Now().Unix(), + Reason: reason, + Details: noCompactDetails, + }) + if err != nil { + return errors.Wrap(err, "json encode no compact mark") + } + + if err := bkt.Upload(ctx, m, bytes.NewBuffer(noCompactMark)); err != nil { + return errors.Wrapf(err, "upload file %s to bucket", m) + } + markedForNoCompact.Inc() + level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id) + return nil +} diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index fb584bc758..15598eaa18 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -311,3 +311,59 @@ func TestMarkForDeletion(t *testing.T) { }) } } + +func TestMarkForNoCompact(t *testing.T) { + defer testutil.TolerantVerifyLeak(t) + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-block-mark-for-no-compact") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + for _, tcase := range []struct { + name string + preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) + + blocksMarked int + }{ + { + name: "block marked", + preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {}, + blocksMarked: 1, + }, + { + name: "block with no-compact mark already, expected log and no metric increment", + preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { + m, err := json.Marshal(metadata.NoCompactMark{ + ID: id, + Time: time.Now().Unix(), + Version: metadata.NoCompactMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m))) + }, + blocksMarked: 0, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + + tcase.preUpload(t, id, bkt) + + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()))) + + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + err = MarkForNoCompact(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoCompactReason, "", c) + testutil.Ok(t, err) + testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c)) + }) + } +} diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index 075311192a..494544be36 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -74,6 +74,8 @@ type NoCompactMark struct { // Version of the file. Version int `json:"version"` + // Time is a unix timestamp of when the block was marked for no compact. + Time int64 `json:"time"` Reason NoCompactReason `json:"reason"` // Details is a human readable string giving details of reason. Details string `json:"details"` diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 5d5c822874..208d4832a4 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -5,10 +5,17 @@ package compact import ( "context" + "fmt" + "math" + "path/filepath" "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" ) type tsdbBasedPlanner struct { @@ -42,7 +49,10 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - noCompactMarked := p.noCompBlocksFunc() + return p.plan(p.noCompBlocksFunc(), metasByMinTime) +} + +func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime)) for _, meta := range metasByMinTime { if _, excluded := noCompactMarked[meta.ULID]; excluded { @@ -212,3 +222,82 @@ func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta return splitDirs } + +type largeTotalIndexSizeFilter struct { + *tsdbBasedPlanner + + bkt objstore.Bucket + markedForNoCompact prometheus.Counter + totalMaxIndexSizeBytes int64 +} + +var _ Planner = &largeTotalIndexSizeFilter{} + +// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size. +// When found, it marks block for no compaction by placing no-compact.json and updating cache. +// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes. +// Adjust limit accordingly reducing to some % of actual limit you want to give. +// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390. +func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, totalMaxIndexSizeBytes int64, markedForNoCompact prometheus.Counter) *largeTotalIndexSizeFilter { + return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} +} + +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + noCompactMarked := t.noCompBlocksFunc() + copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) + for k, v := range noCompactMarked { + copiedNoCompactMarked[k] = v + } + +PlanLoop: + for { + plan, err := t.plan(copiedNoCompactMarked, metasByMinTime) + if err != nil { + return nil, err + } + var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64 + var biggestIndex int + for i, p := range plan { + indexSize := int64(-1) + for _, f := range p.Thanos.Files { + if f.RelPath == block.IndexFilename { + indexSize = f.SizeBytes + } + } + if indexSize <= 0 { + // Get size from bkt instead. + attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename)) + if err != nil { + return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename)) + } + indexSize = attr.Size + } + + if maxIndexSize < indexSize { + maxIndexSize = indexSize + biggestIndex = i + } + totalIndexBytes += indexSize + if totalIndexBytes >= t.totalMaxIndexSizeBytes { + // Marking blocks for no compact to limit size. + // TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408 + if err := block.MarkForNoCompact( + ctx, + t.logger, + t.bkt, + plan[biggestIndex].ULID, + metadata.IndexSizeExceedingNoCompactReason, + fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes), + t.markedForNoCompact, + ); err != nil { + return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String()) + } + // Make sure wrapped planner exclude this block. + copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1} + continue PlanLoop + } + } + // Planned blocks should not exceed limit. + return plan, nil + } +} diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index be8d072909..3ac6a9ed77 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -4,6 +4,7 @@ package compact import ( + "bytes" "context" "io/ioutil" "os" @@ -14,8 +15,13 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -438,7 +444,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { } } -func TestTSDBBasedPlanners_PlanWithNoCompactMarkers(t *testing.T) { +func TestTSDBBasedPlanner_PlanWithNoCompactMarks(t *testing.T) { ranges := []int64{ 20, 60, @@ -629,3 +635,219 @@ func TestTSDBBasedPlanners_PlanWithNoCompactMarkers(t *testing.T) { }) } } + +func TestLargeTotalIndexSizeFilter_Plan(t *testing.T) { + ranges := []int64{ + 20, + 60, + 180, + 540, + 1620, + } + + bkt := objstore.NewInMemBucket() + g := &GatherNoCompactionMarkFilter{} + + marked := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + planner := WithLargeTotalIndexSizeFilter(NewPlanner(log.NewNopLogger(), ranges, g), bkt, 100, marked) + var lastMarkValue float64 + for _, c := range []struct { + name string + metas []*metadata.Meta + + expected []*metadata.Meta + expectedMarks float64 + }{ + { + name: "Outside range and excluded", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 100}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + }, + expectedMarks: 0, + }, + { + name: "Blocks to fill the entire parent, but with first one too large.", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 41}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + expectedMarks: 1, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: "Blocks to fill the entire parent, but with second one too large.", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 41}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 20}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + expectedMarks: 1, + }, + { + name: "Blocks to fill the entire parent, but with last size exceeded (should not matter and not even marked).", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: "Blocks to fill the entire parent, but with pre-last one and first too large.", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 50}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 50, MaxTime: 60}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 60, MaxTime: 80}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 50}}, + }, + expectedMarks: 2, + }, + { + name: `Block for the next parent range appeared, and we have a gap with size 20 between second and third block. + Second block is excluded.`, + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, + }, + expectedMarks: 1, + }, + { + name: "We have 20, 60, 20, 60, 240 range blocks. We could compact 20 + 60 + 60, but sixth 6th is excluded", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 960, MaxTime: 980}}, // Fresh one. + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + }, + expectedMarks: 1, + }, + // |--------------| + // |----------------| + // |--------------| + { + name: "Overlapping blocks 1, but total is too large", + metas: []*metadata.Meta{ + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 90}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, + {Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: block.IndexFilename, SizeBytes: 30}}}, + BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + expectedMarks: 1, + }, + } { + if !t.Run(c.name, func(t *testing.T) { + t.Run("from meta", func(t *testing.T) { + obj := bkt.Objects() + for o := range obj { + delete(obj, o) + } + + metasByMinTime := make([]*metadata.Meta, len(c.metas)) + for i := range metasByMinTime { + orig := c.metas[i] + m := &metadata.Meta{} + *m = *orig + metasByMinTime[i] = m + } + sort.Slice(metasByMinTime, func(i, j int) bool { + return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime + }) + + plan, err := planner.Plan(context.Background(), metasByMinTime) + testutil.Ok(t, err) + + for _, m := range plan { + // For less boilerplate. + m.Thanos = metadata.Thanos{} + } + testutil.Equals(t, c.expected, plan) + testutil.Equals(t, c.expectedMarks, promtest.ToFloat64(marked)-lastMarkValue) + lastMarkValue = promtest.ToFloat64(marked) + }) + t.Run("from bkt", func(t *testing.T) { + obj := bkt.Objects() + for o := range obj { + delete(obj, o) + } + + metasByMinTime := make([]*metadata.Meta, len(c.metas)) + for i := range metasByMinTime { + orig := c.metas[i] + m := &metadata.Meta{} + *m = *orig + metasByMinTime[i] = m + } + sort.Slice(metasByMinTime, func(i, j int) bool { + return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime + }) + + for _, m := range metasByMinTime { + testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(m.ULID.String(), block.IndexFilename), bytes.NewReader(make([]byte, m.Thanos.Files[0].SizeBytes)))) + m.Thanos = metadata.Thanos{} + } + + plan, err := planner.Plan(context.Background(), metasByMinTime) + testutil.Ok(t, err) + testutil.Equals(t, c.expected, plan) + testutil.Equals(t, c.expectedMarks, promtest.ToFloat64(marked)-lastMarkValue) + + lastMarkValue = promtest.ToFloat64(marked) + }) + + }) { + return + } + } +} diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 0a1954348f..b8af42b5e7 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -56,6 +56,8 @@ type blockDesc struct { extLset labels.Labels mint int64 maxt int64 + + markedForNoCompact bool } func (b *blockDesc) Create(ctx context.Context, dir string, delay time.Duration) (ulid.ULID, error) { @@ -131,6 +133,39 @@ func TestCompactWithStoreGateway(t *testing.T) { mint: timestamp.FromTime(now.Add(8 * time.Hour)), maxt: timestamp.FromTime(now.Add(10 * time.Hour)), }, + // Non overlapping blocks, ready for compaction, with one blocked marked for no-compact (no-compact-mark.json) + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "2")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(2 * time.Hour)), + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "3")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(2 * time.Hour)), + maxt: timestamp.FromTime(now.Add(4 * time.Hour)), + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "4")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(4 * time.Hour)), + maxt: timestamp.FromTime(now.Add(6 * time.Hour)), + + markedForNoCompact: true, + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "5")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(6 * time.Hour)), + maxt: timestamp.FromTime(now.Add(8 * time.Hour)), + }, + blockDesc{ + series: []labels.Labels{labels.FromStrings("a", "1", "b", "6")}, + extLset: labels.FromStrings("case", "compaction-ready-one-block-marked-for-no-compact", "replica", "1"), + mint: timestamp.FromTime(now.Add(8 * time.Hour)), + maxt: timestamp.FromTime(now.Add(10 * time.Hour)), + }, // Non overlapping blocks, ready for compaction, only after deduplication. blockDesc{ @@ -314,6 +349,9 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, objstore.UploadDir(ctx, logger, bkt, path.Join(dir, id.String()), id.String())) rawBlockIDs[id] = struct{}{} + if b.markedForNoCompact { + testutil.Ok(t, block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + } } { // On top of that, add couple of other tricky cases with different meta. @@ -415,6 +453,11 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup", "replica": "1"}}, @@ -455,6 +498,11 @@ func TestCompactWithStoreGateway(t *testing.T) { {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-one-block-marked-for-no-compact"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "5", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, + {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "6", "case": "compaction-ready-one-block-marked-for-no-compact", "replica": "1"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "2", "case": "compaction-ready-after-dedup"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "3", "case": "compaction-ready-after-dedup"}}, {Value: 120, Metric: map[model.LabelName]model.LabelValue{"a": "1", "b": "4", "case": "compaction-ready-after-dedup"}}, @@ -503,7 +551,7 @@ func TestCompactWithStoreGateway(t *testing.T) { // We expect no ops. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_iterations_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(1), "thanos_compact_group_compactions_failures_total")) @@ -532,20 +580,20 @@ func TestCompactWithStoreGateway(t *testing.T) { testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3), "thanos_compactor_blocks_marked_for_deletion_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(2*4+2+2*3+2), "thanos_compactor_blocks_marked_total")) // 18. testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(5), "thanos_compact_group_compactions_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(3), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_started_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(12), "thanos_compact_group_compaction_runs_completed_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(14), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(14), "thanos_compact_group_compaction_runs_completed_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64( len(rawBlockIDs)+7+ - 5+ // 5 compactions, 5 newly added blocks. + 6+ // 6 compactions, 6 newly added blocks. -2, // Partial block removed. )), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) @@ -566,7 +614,7 @@ func TestCompactWithStoreGateway(t *testing.T) { expectedEndVector, ) // Store view: - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-2)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+6-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) }) @@ -579,20 +627,20 @@ func TestCompactWithStoreGateway(t *testing.T) { // NOTE: We cannot assert on intermediate `thanos_blocks_meta_` metrics as those are gauge and change dynamically due to many // compaction groups. Wait for at least first compaction iteration (next is in 5m). testutil.Ok(t, c.WaitSumMetrics(e2e.Greater(0), "thanos_compactor_iterations_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(16), "thanos_compactor_blocks_cleaned_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(18), "thanos_compactor_blocks_cleaned_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_block_cleanup_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_for_deletion_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_blocks_marked_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_aborted_partial_uploads_deletion_attempts_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_vertical_compactions_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_group_compactions_failures_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compaction_runs_started_total")) - testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(6), "thanos_compact_group_compaction_runs_completed_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(7), "thanos_compact_group_compaction_runs_started_total")) + testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(7), "thanos_compact_group_compaction_runs_completed_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compact_downsample_failures_total")) - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+5-16-2)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7+6-18-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, c.WaitSumMetrics(e2e.Equals(0), "thanos_compactor_halted")) @@ -612,7 +660,7 @@ func TestCompactWithStoreGateway(t *testing.T) { ) // Store view: - testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-16+5-2)), "thanos_blocks_meta_synced")) + testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(float64(len(rawBlockIDs)+7-18+6-2)), "thanos_blocks_meta_synced")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) }) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index ef7f5bf211..90db116f03 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -212,7 +212,7 @@ func TestQueryExternalPrefix(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) - querierURL := urlParse(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix) + querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+externalPrefix) querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix)) t.Cleanup(querierProxy.Close) @@ -241,7 +241,7 @@ func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) - querierURL := urlParse(t, "http://"+q.HTTPEndpoint()+"/"+routePrefix) + querierURL := mustURLParse(t, "http://"+q.HTTPEndpoint()+"/"+routePrefix) querierProxy := httptest.NewServer(e2ethanos.NewSingleHostReverseProxy(querierURL, externalPrefix)) t.Cleanup(querierProxy.Close) @@ -368,7 +368,7 @@ func checkNetworkRequests(t *testing.T, addr string) { testutil.Ok(t, err) } -func urlParse(t *testing.T, addr string) *url.URL { +func mustURLParse(t *testing.T, addr string) *url.URL { u, err := url.Parse(addr) testutil.Ok(t, err) @@ -384,7 +384,7 @@ func instantQuery(t *testing.T, ctx context.Context, addr string, q string, opts logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q, time.Now(), opts) + res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, mustURLParse(t, "http://"+addr), q, time.Now(), opts) if err != nil { return err } @@ -429,7 +429,7 @@ func labelNames(t *testing.T, ctx context.Context, addr string, start, end int64 logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, urlParse(t, "http://"+addr), start, end) + res, err := promclient.NewDefaultClient().LabelNamesInGRPC(ctx, mustURLParse(t, "http://"+addr), start, end) if err != nil { return err } @@ -448,7 +448,7 @@ func labelValues(t *testing.T, ctx context.Context, addr, label string, start, e logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, urlParse(t, "http://"+addr), label, start, end) + res, err := promclient.NewDefaultClient().LabelValuesInGRPC(ctx, mustURLParse(t, "http://"+addr), label, start, end) if err != nil { return err } @@ -466,7 +466,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []storepb.L logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, urlParse(t, "http://"+addr), matchers, start, end) + res, err := promclient.NewDefaultClient().SeriesInGRPC(ctx, mustURLParse(t, "http://"+addr), matchers, start, end) if err != nil { return err } @@ -485,7 +485,7 @@ func rangeQuery(t *testing.T, ctx context.Context, addr string, q string, start, logger := log.NewLogfmtLogger(os.Stdout) logger = log.With(logger, "ts", log.DefaultTimestampUTC) testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, warnings, err := promclient.NewDefaultClient().QueryRange(ctx, urlParse(t, "http://"+addr), q, start, end, step, opts) + res, warnings, err := promclient.NewDefaultClient().QueryRange(ctx, mustURLParse(t, "http://"+addr), q, start, end, step, opts) if err != nil { return err } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index ba9b244fbe..fd119edc4c 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -12,7 +12,6 @@ import ( "io/ioutil" "net/http" "net/http/httptest" - "net/url" "os" "path/filepath" "sync" @@ -322,6 +321,7 @@ func TestRule_AlertmanagerHTTPClient(t *testing.T) { } func TestRule(t *testing.T) { + t.Skip("Flaky test. Fix it. See: https://github.com/thanos-io/thanos/issues/3425.") t.Parallel() s, err := e2e.NewScenario("e2e_test_rule") @@ -547,7 +547,7 @@ func TestRule(t *testing.T) { }, } - alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, mustUrlParse(t, "http://"+am2.HTTPEndpoint())) + alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, mustURLParse(t, "http://"+am2.HTTPEndpoint())) testutil.Ok(t, err) testutil.Equals(t, len(expAlertLabels), len(alrts)) @@ -556,12 +556,6 @@ func TestRule(t *testing.T) { } } -func mustUrlParse(t *testing.T, addr string) *url.URL { - u, err := url.Parse(addr) - testutil.Ok(t, err) - return u -} - // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { t.Skip("TODO: Allow HTTP ports from binaries running on host to be accessible.") diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 164e5bfbfd..d67c775e3a 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -135,7 +135,7 @@ func ruleAndAssert(t *testing.T, ctx context.Context, addr string, typ string, w fmt.Println("ruleAndAssert: Waiting for results for rules type", typ) var result []*rulespb.RuleGroup testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, urlParse(t, "http://"+addr), typ) + res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, mustURLParse(t, "http://"+addr), typ) if err != nil { return err }